Alerting: Add compressed protobuf-based alert state storage (#99193)

This commit is contained in:
Alexander Akhmetov 2025-01-27 18:47:33 +01:00 committed by GitHub
parent 6edd4f5a7c
commit cb43f4b696
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 1437 additions and 134 deletions

View File

@ -421,6 +421,7 @@ protobuf: ## Compile protobuf definitions
buf generate pkg/plugins/backendplugin/secretsmanagerplugin --template pkg/plugins/backendplugin/secretsmanagerplugin/buf.gen.yaml buf generate pkg/plugins/backendplugin/secretsmanagerplugin --template pkg/plugins/backendplugin/secretsmanagerplugin/buf.gen.yaml
buf generate pkg/storage/unified/resource --template pkg/storage/unified/resource/buf.gen.yaml buf generate pkg/storage/unified/resource --template pkg/storage/unified/resource/buf.gen.yaml
buf generate pkg/services/authz/proto/v1 --template pkg/services/authz/proto/v1/buf.gen.yaml buf generate pkg/services/authz/proto/v1 --template pkg/services/authz/proto/v1/buf.gen.yaml
buf generate pkg/services/ngalert/store/proto/v1 --template pkg/services/ngalert/store/proto/v1/buf.gen.yaml
.PHONY: clean .PHONY: clean
clean: ## Clean up intermediate build artifacts. clean: ## Clean up intermediate build artifacts.

View File

@ -191,6 +191,7 @@ Experimental features might be changed or removed without prior notice.
| `tableSharedCrosshair` | Enables shared crosshair in table panel | | `tableSharedCrosshair` | Enables shared crosshair in table panel |
| `kubernetesFeatureToggles` | Use the kubernetes API for feature toggle management in the frontend | | `kubernetesFeatureToggles` | Use the kubernetes API for feature toggle management in the frontend |
| `newFolderPicker` | Enables the nested folder picker without having nested folders enabled | | `newFolderPicker` | Enables the nested folder picker without having nested folders enabled |
| `alertingSaveStateCompressed` | Enables the compressed protobuf-based alert state storage |
| `scopeApi` | In-development feature flag for the scope api using the app platform. | | `scopeApi` | In-development feature flag for the scope api using the app platform. |
| `sqlExpressions` | Enables using SQL and DuckDB functions as Expressions. | | `sqlExpressions` | Enables using SQL and DuckDB functions as Expressions. |
| `nodeGraphDotLayout` | Changed the layout algorithm for the node graph | | `nodeGraphDotLayout` | Changed the layout algorithm for the node graph |

View File

@ -1501,13 +1501,17 @@ github.com/gorilla/handlers v1.5.1/go.mod h1:t8XrUpc4KVXb7HGyJ4/cEnwQiaxrX/hz1Zv
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grafana/alerting v0.0.0-20250115195200-209e052dba64/go.mod h1:QsnoKX/iYZxA4Cv+H+wC7uxutBD8qi8ZW5UJvD2TYmU=
github.com/grafana/authlib v0.0.0-20250120144156-d6737a7dc8f5/go.mod h1:V63rh3udd7sqXJeaG+nGUmViwVnM/bY6t8U9Tols2GU= github.com/grafana/authlib v0.0.0-20250120144156-d6737a7dc8f5/go.mod h1:V63rh3udd7sqXJeaG+nGUmViwVnM/bY6t8U9Tols2GU=
github.com/grafana/authlib v0.0.0-20250120145936-5f0e28e7a87c/go.mod h1:/gYfphsNu9v1qYWXxpv1NSvMEMSwvdf8qb8YlgwIRl8=
github.com/grafana/authlib/types v0.0.0-20250120144156-d6737a7dc8f5/go.mod h1:qYjSd1tmJiuVoSICp7Py9/zD54O9uQQA3wuM6Gg4DFM= github.com/grafana/authlib/types v0.0.0-20250120144156-d6737a7dc8f5/go.mod h1:qYjSd1tmJiuVoSICp7Py9/zD54O9uQQA3wuM6Gg4DFM=
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 h1:qhugDMdQ4Vp68H0tp/0iN17DM2ehRo1rLEdOFe/gB8I= github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 h1:qhugDMdQ4Vp68H0tp/0iN17DM2ehRo1rLEdOFe/gB8I=
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2/go.mod h1:w/aiO1POVIeXUQyl0VQSZjl5OAGDTL5aX+4v0RA1tcw= github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2/go.mod h1:w/aiO1POVIeXUQyl0VQSZjl5OAGDTL5aX+4v0RA1tcw=
github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak= github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak=
github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90= github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90=
github.com/grafana/grafana-app-sdk v0.29.0/go.mod h1:XLt308EmK6kvqPlzjUyXxbwZKEk2vur/eiypUNDay5I= github.com/grafana/grafana-app-sdk v0.29.0/go.mod h1:XLt308EmK6kvqPlzjUyXxbwZKEk2vur/eiypUNDay5I=
github.com/grafana/grafana/apps/advisor v0.0.0-20250121115006-c1eac9f9973f/go.mod h1:goSDiy3jtC2cp8wjpPZdUHRENcoSUHae1/Px/MDfddA=
github.com/grafana/grafana/pkg/promlib v0.0.7/go.mod h1:rnwJXCA2xRwb7F27NB35iO/JsLL/H/+eVXECk/hrEhQ=
github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd/go.mod h1:M5qHK+eWfAv8VR/265dIuEpL3fNfeC21tXXp9itM24A= github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd/go.mod h1:M5qHK+eWfAv8VR/265dIuEpL3fNfeC21tXXp9itM24A=
github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0 h1:bjh0PVYSVVFxzINqPFYJmAmJNrWPgnVjuSdYJGHmtFU= github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0 h1:bjh0PVYSVVFxzINqPFYJmAmJNrWPgnVjuSdYJGHmtFU=
github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0/go.mod h1:7t5XR+2IA8P2qggOAHTj/GCZfoLBle3OvNSYh1VkRBU= github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0/go.mod h1:7t5XR+2IA8P2qggOAHTj/GCZfoLBle3OvNSYh1VkRBU=

View File

@ -154,6 +154,7 @@ export interface FeatureToggles {
jitterAlertRulesWithinGroups?: boolean; jitterAlertRulesWithinGroups?: boolean;
onPremToCloudMigrations?: boolean; onPremToCloudMigrations?: boolean;
alertingSaveStatePeriodic?: boolean; alertingSaveStatePeriodic?: boolean;
alertingSaveStateCompressed?: boolean;
scopeApi?: boolean; scopeApi?: boolean;
promQLScope?: boolean; promQLScope?: boolean;
logQLScope?: boolean; logQLScope?: boolean;

View File

@ -1027,6 +1027,13 @@ var (
FrontendOnly: false, FrontendOnly: false,
Owner: grafanaAlertingSquad, Owner: grafanaAlertingSquad,
}, },
{
Name: "alertingSaveStateCompressed",
Description: "Enables the compressed protobuf-based alert state storage",
Stage: FeatureStageExperimental,
FrontendOnly: false,
Owner: grafanaAlertingSquad,
},
{ {
Name: "scopeApi", Name: "scopeApi",
Description: "In-development feature flag for the scope api using the app platform.", Description: "In-development feature flag for the scope api using the app platform.",

View File

@ -135,6 +135,7 @@ newFolderPicker,experimental,@grafana/grafana-frontend-platform,false,false,true
jitterAlertRulesWithinGroups,preview,@grafana/alerting-squad,false,true,false jitterAlertRulesWithinGroups,preview,@grafana/alerting-squad,false,true,false
onPremToCloudMigrations,preview,@grafana/grafana-operator-experience-squad,false,false,false onPremToCloudMigrations,preview,@grafana/grafana-operator-experience-squad,false,false,false
alertingSaveStatePeriodic,privatePreview,@grafana/alerting-squad,false,false,false alertingSaveStatePeriodic,privatePreview,@grafana/alerting-squad,false,false,false
alertingSaveStateCompressed,experimental,@grafana/alerting-squad,false,false,false
scopeApi,experimental,@grafana/grafana-app-platform-squad,false,false,false scopeApi,experimental,@grafana/grafana-app-platform-squad,false,false,false
promQLScope,GA,@grafana/oss-big-tent,false,false,false promQLScope,GA,@grafana/oss-big-tent,false,false,false
logQLScope,privatePreview,@grafana/observability-logs,false,false,false logQLScope,privatePreview,@grafana/observability-logs,false,false,false

1 Name Stage Owner requiresDevMode RequiresRestart FrontendOnly
135 jitterAlertRulesWithinGroups preview @grafana/alerting-squad false true false
136 onPremToCloudMigrations preview @grafana/grafana-operator-experience-squad false false false
137 alertingSaveStatePeriodic privatePreview @grafana/alerting-squad false false false
138 alertingSaveStateCompressed experimental @grafana/alerting-squad false false false
139 scopeApi experimental @grafana/grafana-app-platform-squad false false false
140 promQLScope GA @grafana/oss-big-tent false false false
141 logQLScope privatePreview @grafana/observability-logs false false false

View File

@ -551,6 +551,10 @@ const (
// Writes the state periodically to the database, asynchronous to rule evaluation // Writes the state periodically to the database, asynchronous to rule evaluation
FlagAlertingSaveStatePeriodic = "alertingSaveStatePeriodic" FlagAlertingSaveStatePeriodic = "alertingSaveStatePeriodic"
// FlagAlertingSaveStateCompressed
// Enables the compressed protobuf-based alert state storage
FlagAlertingSaveStateCompressed = "alertingSaveStateCompressed"
// FlagScopeApi // FlagScopeApi
// In-development feature flag for the scope api using the app platform. // In-development feature flag for the scope api using the app platform.
FlagScopeApi = "scopeApi" FlagScopeApi = "scopeApi"

View File

@ -339,6 +339,21 @@
"expression": "false" "expression": "false"
} }
}, },
{
"metadata": {
"name": "alertingSaveStateCompressed",
"resourceVersion": "1737472824047",
"creationTimestamp": "2025-01-17T18:17:20Z",
"annotations": {
"grafana.app/updatedTimestamp": "2025-01-21 15:20:24.047499 +0000 UTC"
}
},
"spec": {
"description": "Enables the compressed protobuf-based alert state storage",
"stage": "experimental",
"codeowner": "@grafana/alerting-squad"
}
},
{ {
"metadata": { "metadata": {
"name": "alertingSaveStatePeriodic", "name": "alertingSaveStatePeriodic",

View File

@ -144,6 +144,7 @@ type AlertNG struct {
dashboardService dashboards.DashboardService dashboardService dashboards.DashboardService
Api *api.API Api *api.API
httpClientProvider httpclient.Provider httpClientProvider httpclient.Provider
InstanceStore state.InstanceStore
// Alerting notification services // Alerting notification services
MultiOrgAlertmanager *notifier.MultiOrgAlertmanager MultiOrgAlertmanager *notifier.MultiOrgAlertmanager
@ -398,11 +399,14 @@ func (ng *AlertNG) init() error {
if err != nil { if err != nil {
return err return err
} }
cfg := state.ManagerCfg{
ng.InstanceStore = initInstanceStore(ng.store.SQLStore, ng.Log.New("ngalert.state.instancestore"), ng.FeatureToggles)
stateManagerCfg := state.ManagerCfg{
Metrics: ng.Metrics.GetStateMetrics(), Metrics: ng.Metrics.GetStateMetrics(),
ExternalURL: appUrl, ExternalURL: appUrl,
DisableExecution: !ng.Cfg.UnifiedAlerting.ExecuteAlerts, DisableExecution: !ng.Cfg.UnifiedAlerting.ExecuteAlerts,
InstanceStore: ng.store, InstanceStore: ng.InstanceStore,
Images: ng.ImageService, Images: ng.ImageService,
Clock: clk, Clock: clk,
Historian: history, Historian: history,
@ -415,13 +419,8 @@ func (ng *AlertNG) init() error {
Log: log.New("ngalert.state.manager"), Log: log.New("ngalert.state.manager"),
ResolvedRetention: ng.Cfg.UnifiedAlerting.ResolvedAlertRetention, ResolvedRetention: ng.Cfg.UnifiedAlerting.ResolvedAlertRetention,
} }
logger := log.New("ngalert.state.manager.persist") statePersister := initStatePersister(ng.Cfg.UnifiedAlerting, stateManagerCfg, ng.FeatureToggles)
statePersister := state.NewSyncStatePersisiter(logger, cfg) stateManager := state.NewManager(stateManagerCfg, statePersister)
if ng.FeatureToggles.IsEnabledGlobally(featuremgmt.FlagAlertingSaveStatePeriodic) {
ticker := clock.New().Ticker(ng.Cfg.UnifiedAlerting.StatePeriodicSaveInterval)
statePersister = state.NewAsyncStatePersister(logger, ticker, cfg)
}
stateManager := state.NewManager(cfg, statePersister)
scheduler := schedule.NewScheduler(schedCfg, stateManager) scheduler := schedule.NewScheduler(schedCfg, stateManager)
// if it is required to include folder title to the alerts, we need to subscribe to changes of alert title // if it is required to include folder title to the alerts, we need to subscribe to changes of alert title
@ -515,6 +514,54 @@ func (ng *AlertNG) init() error {
return DeclareFixedRoles(ng.AccesscontrolService, ng.FeatureToggles) return DeclareFixedRoles(ng.AccesscontrolService, ng.FeatureToggles)
} }
func initInstanceStore(sqlStore db.DB, logger log.Logger, featureToggles featuremgmt.FeatureToggles) state.InstanceStore {
var instanceStore state.InstanceStore
if featureToggles.IsEnabledGlobally(featuremgmt.FlagAlertingSaveStateCompressed) {
logger.Info("Using protobuf-based alert instance store")
instanceStore = store.ProtoInstanceDBStore{
SQLStore: sqlStore,
Logger: logger,
FeatureToggles: featureToggles,
}
// If FlagAlertingSaveStateCompressed is enabled, ProtoInstanceDBStore is used,
// which functions differently from InstanceDBStore. FlagAlertingSaveStatePeriodic is
// not applicable to ProtoInstanceDBStore, so a warning is logged if it is set.
if featureToggles.IsEnabledGlobally(featuremgmt.FlagAlertingSaveStatePeriodic) {
logger.Warn("alertingSaveStatePeriodic is not used when alertingSaveStateCompressed feature flag enabled")
}
} else {
logger.Info("Using simple database alert instance store")
instanceStore = store.InstanceDBStore{
SQLStore: sqlStore,
Logger: logger,
FeatureToggles: featureToggles,
}
}
return instanceStore
}
func initStatePersister(uaCfg setting.UnifiedAlertingSettings, cfg state.ManagerCfg, featureToggles featuremgmt.FeatureToggles) state.StatePersister {
logger := log.New("ngalert.state.manager.persist")
var statePersister state.StatePersister
if featureToggles.IsEnabledGlobally(featuremgmt.FlagAlertingSaveStateCompressed) {
logger.Info("Using rule state persister")
statePersister = state.NewSyncRuleStatePersisiter(logger, cfg)
} else if featureToggles.IsEnabledGlobally(featuremgmt.FlagAlertingSaveStatePeriodic) {
logger.Info("Using periodic state persister")
ticker := clock.New().Ticker(uaCfg.StatePeriodicSaveInterval)
statePersister = state.NewAsyncStatePersister(logger, ticker, cfg)
} else {
logger.Info("Using sync state persister")
statePersister = state.NewSyncStatePersisiter(logger, cfg)
}
return statePersister
}
func subscribeToFolderChanges(logger log.Logger, bus bus.Bus, dbStore api.RuleStore) { func subscribeToFolderChanges(logger log.Logger, bus bus.Bus, dbStore api.RuleStore) {
// if full path to the folder is changed, we update all alert rules in that folder to make sure that all peers (in HA mode) will update folder title and // if full path to the folder is changed, we update all alert rules in that folder to make sure that all peers (in HA mode) will update folder title and
// clean up the current state // clean up the current state
@ -553,7 +600,7 @@ func (ng *AlertNG) Run(ctx context.Context) error {
// Also note that this runs synchronously to ensure state is loaded // Also note that this runs synchronously to ensure state is loaded
// before rule evaluation begins, hence we use ctx and not subCtx. // before rule evaluation begins, hence we use ctx and not subCtx.
// //
ng.stateManager.Warm(ctx, ng.store, ng.store) ng.stateManager.Warm(ctx, ng.store, ng.InstanceStore)
children.Go(func() error { children.Go(func() error {
return ng.schedule.Run(subCtx) return ng.schedule.Run(subCtx)

View File

@ -14,12 +14,16 @@ import (
"github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/events" "github.com/grafana/grafana/pkg/events"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/folder" "github.com/grafana/grafana/pkg/services/folder"
acfakes "github.com/grafana/grafana/pkg/services/ngalert/accesscontrol/fakes" acfakes "github.com/grafana/grafana/pkg/services/ngalert/accesscontrol/fakes"
"github.com/grafana/grafana/pkg/services/ngalert/metrics" "github.com/grafana/grafana/pkg/services/ngalert/metrics"
"github.com/grafana/grafana/pkg/services/ngalert/models" "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/services/ngalert/store"
"github.com/grafana/grafana/pkg/services/ngalert/tests/fakes" "github.com/grafana/grafana/pkg/services/ngalert/tests/fakes"
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/util" "github.com/grafana/grafana/pkg/util"
@ -192,3 +196,94 @@ grafana_alerting_state_history_info{backend="noop"} 0
require.NoError(t, err) require.NoError(t, err)
}) })
} }
type mockDB struct {
db.DB
}
func TestInitInstanceStore(t *testing.T) {
sqlStore := &mockDB{}
logger := log.New()
tests := []struct {
name string
ft featuremgmt.FeatureToggles
expectedInstanceStoreType interface{}
}{
{
name: "Compressed flag enabled, no periodic flag",
ft: featuremgmt.WithFeatures(
featuremgmt.FlagAlertingSaveStateCompressed,
),
expectedInstanceStoreType: store.ProtoInstanceDBStore{},
},
{
name: "Compressed flag enabled with periodic flag",
ft: featuremgmt.WithFeatures(
featuremgmt.FlagAlertingSaveStateCompressed,
featuremgmt.FlagAlertingSaveStatePeriodic,
),
expectedInstanceStoreType: store.ProtoInstanceDBStore{},
},
{
name: "Compressed flag disabled",
ft: featuremgmt.WithFeatures(),
expectedInstanceStoreType: store.InstanceDBStore{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
instanceStore := initInstanceStore(sqlStore, logger, tt.ft)
assert.IsType(t, tt.expectedInstanceStoreType, instanceStore)
})
}
}
func TestInitStatePersister(t *testing.T) {
ua := setting.UnifiedAlertingSettings{
StatePeriodicSaveInterval: 1 * time.Minute,
}
cfg := state.ManagerCfg{}
tests := []struct {
name string
ft featuremgmt.FeatureToggles
expectedStatePersisterType state.StatePersister
}{
{
name: "Compressed flag enabled",
ft: featuremgmt.WithFeatures(
featuremgmt.FlagAlertingSaveStateCompressed,
),
expectedStatePersisterType: &state.SyncRuleStatePersister{},
},
{
name: "Periodic flag enabled",
ft: featuremgmt.WithFeatures(
featuremgmt.FlagAlertingSaveStatePeriodic,
),
expectedStatePersisterType: &state.AsyncStatePersister{},
},
{
name: "No flags enabled",
ft: featuremgmt.WithFeatures(),
expectedStatePersisterType: &state.SyncStatePersister{},
},
{
name: "Both flags enabled - compressed takes precedence",
ft: featuremgmt.WithFeatures(
featuremgmt.FlagAlertingSaveStateCompressed,
featuremgmt.FlagAlertingSaveStatePeriodic,
),
expectedStatePersisterType: &state.SyncRuleStatePersister{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
statePersister := initStatePersister(ua, cfg, tt.ft)
assert.IsType(t, tt.expectedStatePersisterType, statePersister)
})
}
}

View File

@ -47,7 +47,7 @@ func TestWarmStateCache(t *testing.T) {
evaluationTime, err := time.Parse("2006-01-02", "2021-03-25") evaluationTime, err := time.Parse("2006-01-02", "2021-03-25")
require.NoError(t, err) require.NoError(t, err)
ctx := context.Background() ctx := context.Background()
_, dbstore := tests.SetupTestEnv(t, 1) ng, dbstore := tests.SetupTestEnv(t, 1)
const mainOrgID int64 = 1 const mainOrgID int64 = 1
rule := tests.CreateTestAlertRule(t, ctx, dbstore, 600, mainOrgID) rule := tests.CreateTestAlertRule(t, ctx, dbstore, 600, mainOrgID)
@ -216,13 +216,13 @@ func TestWarmStateCache(t *testing.T) {
ResultFingerprint: data.Fingerprint(2).String(), ResultFingerprint: data.Fingerprint(2).String(),
}) })
for _, instance := range instances { for _, instance := range instances {
_ = dbstore.SaveAlertInstance(ctx, instance) _ = ng.InstanceStore.SaveAlertInstance(ctx, instance)
} }
cfg := state.ManagerCfg{ cfg := state.ManagerCfg{
Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(),
ExternalURL: nil, ExternalURL: nil,
InstanceStore: dbstore, InstanceStore: ng.InstanceStore,
Images: &state.NoopImageService{}, Images: &state.NoopImageService{},
Clock: clock.NewMock(), Clock: clock.NewMock(),
Historian: &state.FakeHistorian{}, Historian: &state.FakeHistorian{},
@ -230,7 +230,7 @@ func TestWarmStateCache(t *testing.T) {
Log: log.New("ngalert.state.manager"), Log: log.New("ngalert.state.manager"),
} }
st := state.NewManager(cfg, state.NewNoopPersister()) st := state.NewManager(cfg, state.NewNoopPersister())
st.Warm(ctx, dbstore, dbstore) st.Warm(ctx, dbstore, ng.InstanceStore)
t.Run("instance cache has expected entries", func(t *testing.T) { t.Run("instance cache has expected entries", func(t *testing.T) {
for _, entry := range expectedEntries { for _, entry := range expectedEntries {
@ -250,7 +250,7 @@ func TestDashboardAnnotations(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
ctx := context.Background() ctx := context.Background()
_, dbstore := tests.SetupTestEnv(t, 1) ng, dbstore := tests.SetupTestEnv(t, 1)
fakeAnnoRepo := annotationstest.NewFakeAnnotationsRepo() fakeAnnoRepo := annotationstest.NewFakeAnnotationsRepo()
historianMetrics := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem) historianMetrics := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)
@ -261,7 +261,7 @@ func TestDashboardAnnotations(t *testing.T) {
cfg := state.ManagerCfg{ cfg := state.ManagerCfg{
Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(),
ExternalURL: nil, ExternalURL: nil,
InstanceStore: dbstore, InstanceStore: ng.InstanceStore,
Images: &state.NoopImageService{}, Images: &state.NoopImageService{},
Clock: clock.New(), Clock: clock.New(),
Historian: hist, Historian: hist,
@ -277,7 +277,7 @@ func TestDashboardAnnotations(t *testing.T) {
"test2": "{{ $labels.instance_label }}", "test2": "{{ $labels.instance_label }}",
}) })
st.Warm(ctx, dbstore, dbstore) st.Warm(ctx, dbstore, ng.InstanceStore)
bValue := float64(42) bValue := float64(42)
cValue := float64(1) cValue := float64(1)
_ = st.ProcessEvalResults(ctx, evaluationTime, rule, eval.Results{{ _ = st.ProcessEvalResults(ctx, evaluationTime, rule, eval.Results{{
@ -1697,7 +1697,7 @@ func TestStaleResultsHandler(t *testing.T) {
interval := time.Minute interval := time.Minute
ctx := context.Background() ctx := context.Background()
_, dbstore := tests.SetupTestEnv(t, 1) ng, dbstore := tests.SetupTestEnv(t, 1)
const mainOrgID int64 = 1 const mainOrgID int64 = 1
rule := tests.CreateTestAlertRule(t, ctx, dbstore, int64(interval.Seconds()), mainOrgID) rule := tests.CreateTestAlertRule(t, ctx, dbstore, int64(interval.Seconds()), mainOrgID)
@ -1751,7 +1751,7 @@ func TestStaleResultsHandler(t *testing.T) {
} }
for _, instance := range instances { for _, instance := range instances {
_ = dbstore.SaveAlertInstance(ctx, instance) _ = ng.InstanceStore.SaveAlertInstance(ctx, instance)
} }
testCases := []struct { testCases := []struct {
@ -1805,7 +1805,7 @@ func TestStaleResultsHandler(t *testing.T) {
cfg := state.ManagerCfg{ cfg := state.ManagerCfg{
Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(),
ExternalURL: nil, ExternalURL: nil,
InstanceStore: dbstore, InstanceStore: ng.InstanceStore,
Images: &state.NoopImageService{}, Images: &state.NoopImageService{},
Clock: clock.New(), Clock: clock.New(),
Historian: &state.FakeHistorian{}, Historian: &state.FakeHistorian{},
@ -1813,7 +1813,7 @@ func TestStaleResultsHandler(t *testing.T) {
Log: log.New("ngalert.state.manager"), Log: log.New("ngalert.state.manager"),
} }
st := state.NewManager(cfg, state.NewNoopPersister()) st := state.NewManager(cfg, state.NewNoopPersister())
st.Warm(ctx, dbstore, dbstore) st.Warm(ctx, dbstore, ng.InstanceStore)
existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID) existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID)
// We have loaded the expected number of entries from the db // We have loaded the expected number of entries from the db
@ -1978,7 +1978,7 @@ func TestStaleResults(t *testing.T) {
func TestDeleteStateByRuleUID(t *testing.T) { func TestDeleteStateByRuleUID(t *testing.T) {
interval := time.Minute interval := time.Minute
ctx := context.Background() ctx := context.Background()
_, dbstore := tests.SetupTestEnv(t, 1) ng, dbstore := tests.SetupTestEnv(t, 1)
const mainOrgID int64 = 1 const mainOrgID int64 = 1
rule := tests.CreateTestAlertRule(t, ctx, dbstore, int64(interval.Seconds()), mainOrgID) rule := tests.CreateTestAlertRule(t, ctx, dbstore, int64(interval.Seconds()), mainOrgID)
@ -2009,7 +2009,7 @@ func TestDeleteStateByRuleUID(t *testing.T) {
} }
for _, instance := range instances { for _, instance := range instances {
_ = dbstore.SaveAlertInstance(ctx, instance) _ = ng.InstanceStore.SaveAlertInstance(ctx, instance)
} }
testCases := []struct { testCases := []struct {
@ -2025,7 +2025,7 @@ func TestDeleteStateByRuleUID(t *testing.T) {
}{ }{
{ {
desc: "all states/instances are removed from cache and DB", desc: "all states/instances are removed from cache and DB",
instanceStore: dbstore, instanceStore: ng.InstanceStore,
expectedStates: []*state.State{ expectedStates: []*state.State{
{ {
AlertRuleUID: rule.UID, AlertRuleUID: rule.UID,
@ -2065,7 +2065,7 @@ func TestDeleteStateByRuleUID(t *testing.T) {
cfg := state.ManagerCfg{ cfg := state.ManagerCfg{
Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(),
ExternalURL: nil, ExternalURL: nil,
InstanceStore: dbstore, InstanceStore: ng.InstanceStore,
Images: &state.NoopImageService{}, Images: &state.NoopImageService{},
Clock: clk, Clock: clk,
Historian: &state.FakeHistorian{}, Historian: &state.FakeHistorian{},
@ -2073,9 +2073,9 @@ func TestDeleteStateByRuleUID(t *testing.T) {
Log: log.New("ngalert.state.manager"), Log: log.New("ngalert.state.manager"),
} }
st := state.NewManager(cfg, state.NewNoopPersister()) st := state.NewManager(cfg, state.NewNoopPersister())
st.Warm(ctx, dbstore, dbstore) st.Warm(ctx, dbstore, ng.InstanceStore)
q := &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID} q := &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID}
alerts, _ := dbstore.ListAlertInstances(ctx, q) alerts, _ := ng.InstanceStore.ListAlertInstances(ctx, q)
existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID) existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID)
// We have loaded the expected number of entries from the db // We have loaded the expected number of entries from the db
@ -2107,7 +2107,7 @@ func TestDeleteStateByRuleUID(t *testing.T) {
} }
q = &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID} q = &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID}
alertInstances, _ := dbstore.ListAlertInstances(ctx, q) alertInstances, _ := ng.InstanceStore.ListAlertInstances(ctx, q)
existingStatesForRule = st.GetStatesForRuleUID(rule.OrgID, rule.UID) existingStatesForRule = st.GetStatesForRuleUID(rule.OrgID, rule.UID)
// The expected number of state entries remains after states are deleted // The expected number of state entries remains after states are deleted
@ -2120,7 +2120,7 @@ func TestDeleteStateByRuleUID(t *testing.T) {
func TestResetStateByRuleUID(t *testing.T) { func TestResetStateByRuleUID(t *testing.T) {
interval := time.Minute interval := time.Minute
ctx := context.Background() ctx := context.Background()
_, dbstore := tests.SetupTestEnv(t, 1) ng, dbstore := tests.SetupTestEnv(t, 1)
const mainOrgID int64 = 1 const mainOrgID int64 = 1
rule := tests.CreateTestAlertRule(t, ctx, dbstore, int64(interval.Seconds()), mainOrgID) rule := tests.CreateTestAlertRule(t, ctx, dbstore, int64(interval.Seconds()), mainOrgID)
@ -2151,7 +2151,7 @@ func TestResetStateByRuleUID(t *testing.T) {
} }
for _, instance := range instances { for _, instance := range instances {
_ = dbstore.SaveAlertInstance(ctx, instance) _ = ng.InstanceStore.SaveAlertInstance(ctx, instance)
} }
testCases := []struct { testCases := []struct {
@ -2168,7 +2168,7 @@ func TestResetStateByRuleUID(t *testing.T) {
}{ }{
{ {
desc: "all states/instances are removed from cache and DB and saved in historian", desc: "all states/instances are removed from cache and DB and saved in historian",
instanceStore: dbstore, instanceStore: ng.InstanceStore,
expectedStates: []*state.State{ expectedStates: []*state.State{
{ {
AlertRuleUID: rule.UID, AlertRuleUID: rule.UID,
@ -2206,7 +2206,7 @@ func TestResetStateByRuleUID(t *testing.T) {
cfg := state.ManagerCfg{ cfg := state.ManagerCfg{
Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(),
ExternalURL: nil, ExternalURL: nil,
InstanceStore: dbstore, InstanceStore: ng.InstanceStore,
Images: &state.NoopImageService{}, Images: &state.NoopImageService{},
Clock: clk, Clock: clk,
Historian: fakeHistorian, Historian: fakeHistorian,
@ -2214,9 +2214,9 @@ func TestResetStateByRuleUID(t *testing.T) {
Log: log.New("ngalert.state.manager"), Log: log.New("ngalert.state.manager"),
} }
st := state.NewManager(cfg, state.NewNoopPersister()) st := state.NewManager(cfg, state.NewNoopPersister())
st.Warm(ctx, dbstore, dbstore) st.Warm(ctx, dbstore, ng.InstanceStore)
q := &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID} q := &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID}
alerts, _ := dbstore.ListAlertInstances(ctx, q) alerts, _ := ng.InstanceStore.ListAlertInstances(ctx, q)
existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID) existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID)
// We have loaded the expected number of entries from the db // We have loaded the expected number of entries from the db
@ -2251,7 +2251,7 @@ func TestResetStateByRuleUID(t *testing.T) {
assert.Equal(t, transitions, fakeHistorian.StateTransitions) assert.Equal(t, transitions, fakeHistorian.StateTransitions)
q = &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID} q = &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID}
alertInstances, _ := dbstore.ListAlertInstances(ctx, q) alertInstances, _ := ng.InstanceStore.ListAlertInstances(ctx, q)
existingStatesForRule = st.GetStatesForRuleUID(rule.OrgID, rule.UID) existingStatesForRule = st.GetStatesForRuleUID(rule.OrgID, rule.UID)
// The expected number of state entries remains after states are deleted // The expected number of state entries remains after states are deleted

View File

@ -9,14 +9,21 @@ import (
"time" "time"
"github.com/grafana/grafana/pkg/infra/db" "github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/ngalert/models" "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/sqlstore" "github.com/grafana/grafana/pkg/services/sqlstore"
) )
type InstanceDBStore struct {
SQLStore db.DB
Logger log.Logger
FeatureToggles featuremgmt.FeatureToggles
}
// ListAlertInstances is a handler for retrieving alert instances within specific organisation // ListAlertInstances is a handler for retrieving alert instances within specific organisation
// based on various filters. // based on various filters.
func (st DBstore) ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) (result []*models.AlertInstance, err error) { func (st InstanceDBStore) ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) (result []*models.AlertInstance, err error) {
err = st.SQLStore.WithDbSession(ctx, func(sess *db.Session) error { err = st.SQLStore.WithDbSession(ctx, func(sess *db.Session) error {
alertInstances := make([]*models.AlertInstance, 0) alertInstances := make([]*models.AlertInstance, 0)
@ -51,7 +58,7 @@ func (st DBstore) ListAlertInstances(ctx context.Context, cmd *models.ListAlertI
} }
// SaveAlertInstance is a handler for saving a new alert instance. // SaveAlertInstance is a handler for saving a new alert instance.
func (st DBstore) SaveAlertInstance(ctx context.Context, alertInstance models.AlertInstance) error { func (st InstanceDBStore) SaveAlertInstance(ctx context.Context, alertInstance models.AlertInstance) error {
return st.SQLStore.WithDbSession(ctx, func(sess *db.Session) error { return st.SQLStore.WithDbSession(ctx, func(sess *db.Session) error {
if err := models.ValidateAlertInstance(alertInstance); err != nil { if err := models.ValidateAlertInstance(alertInstance); err != nil {
return err return err
@ -89,7 +96,7 @@ func (st DBstore) SaveAlertInstance(ctx context.Context, alertInstance models.Al
}) })
} }
func (st DBstore) FetchOrgIds(ctx context.Context) ([]int64, error) { func (st InstanceDBStore) FetchOrgIds(ctx context.Context) ([]int64, error) {
orgIds := []int64{} orgIds := []int64{}
err := st.SQLStore.WithDbSession(ctx, func(sess *db.Session) error { err := st.SQLStore.WithDbSession(ctx, func(sess *db.Session) error {
@ -113,7 +120,7 @@ func (st DBstore) FetchOrgIds(ctx context.Context) ([]int64, error) {
} }
// DeleteAlertInstances deletes instances with the provided keys in a single transaction. // DeleteAlertInstances deletes instances with the provided keys in a single transaction.
func (st DBstore) DeleteAlertInstances(ctx context.Context, keys ...models.AlertInstanceKey) error { func (st InstanceDBStore) DeleteAlertInstances(ctx context.Context, keys ...models.AlertInstanceKey) error {
if len(keys) == 0 { if len(keys) == 0 {
return nil return nil
} }
@ -212,12 +219,13 @@ func (st DBstore) DeleteAlertInstances(ctx context.Context, keys ...models.Alert
} }
// SaveAlertInstancesForRule is not implemented for instance database store. // SaveAlertInstancesForRule is not implemented for instance database store.
func (st DBstore) SaveAlertInstancesForRule(ctx context.Context, key models.AlertRuleKeyWithGroup, instances []models.AlertInstance) error { func (st InstanceDBStore) SaveAlertInstancesForRule(ctx context.Context, key models.AlertRuleKeyWithGroup, instances []models.AlertInstance) error {
st.Logger.Error("SaveAlertInstancesForRule is not implemented for instance database store.") st.Logger.Error("SaveAlertInstancesForRule is not implemented for instance database store.")
return errors.New("method SaveAlertInstancesForRule is not implemented for instance database store") return errors.New("method SaveAlertInstancesForRule is not implemented for instance database store")
} }
func (st DBstore) DeleteAlertInstancesByRule(ctx context.Context, key models.AlertRuleKeyWithGroup) error { // DeleteAlertInstancesByRule deletes all instances for a given rule.
func (st InstanceDBStore) DeleteAlertInstancesByRule(ctx context.Context, key models.AlertRuleKeyWithGroup) error {
return st.SQLStore.WithTransactionalDbSession(ctx, func(sess *db.Session) error { return st.SQLStore.WithTransactionalDbSession(ctx, func(sess *db.Session) error {
_, err := sess.Exec("DELETE FROM alert_instance WHERE rule_org_id = ? AND rule_uid = ?", key.OrgID, key.UID) _, err := sess.Exec("DELETE FROM alert_instance WHERE rule_org_id = ? AND rule_uid = ?", key.OrgID, key.UID)
return err return err
@ -230,7 +238,7 @@ func (st DBstore) DeleteAlertInstancesByRule(ctx context.Context, key models.Ale
// //
// The batchSize parameter controls how many instances are inserted per batch. Increasing batchSize can improve // The batchSize parameter controls how many instances are inserted per batch. Increasing batchSize can improve
// performance for large datasets, but can also increase load on the database. // performance for large datasets, but can also increase load on the database.
func (st DBstore) FullSync(ctx context.Context, instances []models.AlertInstance, batchSize int) error { func (st InstanceDBStore) FullSync(ctx context.Context, instances []models.AlertInstance, batchSize int) error {
if len(instances) == 0 { if len(instances) == 0 {
return nil return nil
} }
@ -267,7 +275,7 @@ func (st DBstore) FullSync(ctx context.Context, instances []models.AlertInstance
}) })
} }
func (st DBstore) insertInstancesBatch(sess *sqlstore.DBSession, batch []models.AlertInstance) error { func (st InstanceDBStore) insertInstancesBatch(sess *sqlstore.DBSession, batch []models.AlertInstance) error {
// If the batch is empty, nothing to insert. // If the batch is empty, nothing to insert.
if len(batch) == 0 { if len(batch) == 0 {
return nil return nil

View File

@ -0,0 +1,105 @@
package store_test
import (
"context"
"flag"
"fmt"
"testing"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/tests"
)
var saveStateCompressed = flag.Bool("save-state-compressed", false, "Save state compressed")
func BenchmarkSaveAlertInstances(b *testing.B) {
ctx := context.Background()
opts := []tests.TestEnvOption{}
if *saveStateCompressed {
opts = append(opts, tests.WithFeatureToggles(
featuremgmt.WithFeatures(featuremgmt.FlagAlertingSaveStateCompressed)),
)
}
benchmarkRun := func(b *testing.B, instanceCount, labelCount int) {
ng, dbstore := tests.SetupTestEnv(b, baseIntervalSeconds, opts...)
const mainOrgID int64 = 1
alertRule := tests.CreateTestAlertRule(b, ctx, dbstore, 60, mainOrgID)
// Create some instances to write down and then delete.
instances := make([]models.AlertInstance, 0, instanceCount)
keys := make([]models.AlertInstanceKey, 0, instanceCount)
for i := 0; i < instanceCount; i++ {
labels := models.InstanceLabels{"instance": fmt.Sprintf("instance-%d", i)}
for li := 0; li < labelCount; li++ {
labels[fmt.Sprintf("label-%d", li)] = fmt.Sprintf("value-%d", li)
}
_, labelsHash, _ := labels.StringAndHash()
instance := models.AlertInstance{
AlertInstanceKey: models.AlertInstanceKey{
RuleOrgID: alertRule.OrgID,
RuleUID: alertRule.UID,
LabelsHash: labelsHash,
},
CurrentState: models.InstanceStateFiring,
CurrentReason: string(models.InstanceStateError),
Labels: labels,
}
instances = append(instances, instance)
keys = append(keys, instance.AlertInstanceKey)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
var err error
if *saveStateCompressed {
err = ng.InstanceStore.SaveAlertInstancesForRule(ctx, alertRule.GetKeyWithGroup(), instances)
if err != nil {
b.Fatalf("error: %s", err)
}
// Clean up instances.
b.StopTimer()
err = ng.InstanceStore.DeleteAlertInstancesByRule(ctx, alertRule.GetKeyWithGroup())
if err != nil {
b.Fatalf("error: %s", err)
}
b.StartTimer()
} else {
for _, instance := range instances {
err = ng.InstanceStore.SaveAlertInstance(ctx, instance)
if err != nil {
b.Fatalf("error: %s", err)
}
}
// Clean up instances.
b.StopTimer()
err = ng.InstanceStore.DeleteAlertInstances(ctx, keys...)
if err != nil {
b.Fatalf("error: %s", err)
}
b.StartTimer()
}
}
}
b.Run("100 instances with 10 labels each", func(b *testing.B) {
benchmarkRun(b, 100, 10)
})
b.Run("100 instances with 100 labels each", func(b *testing.B) {
benchmarkRun(b, 100, 100)
})
b.Run("1000 instances with 10 labels each", func(b *testing.B) {
benchmarkRun(b, 1000, 10)
})
}

View File

@ -1,57 +1,188 @@
package store_test package store_test
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"testing" "testing"
"time" "time"
"github.com/golang/snappy"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/ngalert/models" "github.com/grafana/grafana/pkg/services/ngalert/models"
pb "github.com/grafana/grafana/pkg/services/ngalert/store/proto/v1"
"github.com/grafana/grafana/pkg/services/ngalert/tests" "github.com/grafana/grafana/pkg/services/ngalert/tests"
"github.com/grafana/grafana/pkg/util" "github.com/grafana/grafana/pkg/util"
) )
const baseIntervalSeconds = 10 const baseIntervalSeconds = 10
func BenchmarkAlertInstanceOperations(b *testing.B) { func TestIntegration_CompressedAlertRuleStateOperations(t *testing.T) {
b.StopTimer() if testing.Short() {
t.Skip("skipping integration test")
}
ctx := context.Background() ctx := context.Background()
_, dbstore := tests.SetupTestEnv(b, baseIntervalSeconds) ng, dbstore := tests.SetupTestEnv(
t,
baseIntervalSeconds,
tests.WithFeatureToggles(
featuremgmt.WithFeatures(featuremgmt.FlagAlertingSaveStateCompressed),
),
)
const mainOrgID int64 = 1 const mainOrgID int64 = 1
alertRule := tests.CreateTestAlertRule(b, ctx, dbstore, 60, mainOrgID) alertRule1 := tests.CreateTestAlertRule(t, ctx, dbstore, 60, mainOrgID)
orgID := alertRule1.OrgID
alertRule2 := tests.CreateTestAlertRule(t, ctx, dbstore, 60, mainOrgID)
require.Equal(t, orgID, alertRule2.OrgID)
// Create some instances to write down and then delete. tests := []struct {
count := 10_003 name string
instances := make([]models.AlertInstance, 0, count) setupInstances func() []models.AlertInstance
keys := make([]models.AlertInstanceKey, 0, count) listQuery *models.ListAlertInstancesQuery
for i := 0; i < count; i++ { validate func(t *testing.T, alerts []*models.AlertInstance)
labels := models.InstanceLabels{"test": fmt.Sprint(i)} }{
_, labelsHash, _ := labels.StringAndHash() {
instance := models.AlertInstance{ name: "can save and read alert rule state",
AlertInstanceKey: models.AlertInstanceKey{ setupInstances: func() []models.AlertInstance {
RuleOrgID: alertRule.OrgID, return []models.AlertInstance{
RuleUID: alertRule.UID, createAlertInstance(alertRule1.OrgID, alertRule1.UID, "labelsHash1", string(models.InstanceStateError), models.InstanceStateFiring),
LabelsHash: labelsHash, }
}, },
CurrentState: models.InstanceStateFiring, listQuery: &models.ListAlertInstancesQuery{
CurrentReason: string(models.InstanceStateError), RuleOrgID: alertRule1.OrgID,
Labels: labels, RuleUID: alertRule1.UID,
} },
instances = append(instances, instance) validate: func(t *testing.T, alerts []*models.AlertInstance) {
keys = append(keys, instance.AlertInstanceKey) require.Len(t, alerts, 1)
require.Equal(t, "labelsHash1", alerts[0].LabelsHash)
},
},
{
name: "can save and read alert rule state with multiple instances",
setupInstances: func() []models.AlertInstance {
return []models.AlertInstance{
createAlertInstance(alertRule1.OrgID, alertRule1.UID, "hash1", "", models.InstanceStateFiring),
createAlertInstance(alertRule1.OrgID, alertRule1.UID, "hash2", "", models.InstanceStateFiring),
}
},
listQuery: &models.ListAlertInstancesQuery{
RuleOrgID: alertRule1.OrgID,
RuleUID: alertRule1.UID,
},
validate: func(t *testing.T, alerts []*models.AlertInstance) {
require.Len(t, alerts, 2)
containsHash(t, alerts, "hash1")
containsHash(t, alerts, "hash2")
},
},
} }
b.StartTimer() for _, tc := range tests {
for i := 0; i < b.N; i++ { t.Run(tc.name, func(t *testing.T) {
for _, instance := range instances { instances := tc.setupInstances()
_ = dbstore.SaveAlertInstance(ctx, instance) err := ng.InstanceStore.SaveAlertInstancesForRule(ctx, alertRule1.GetKeyWithGroup(), instances)
require.NoError(t, err)
alerts, err := ng.InstanceStore.ListAlertInstances(ctx, tc.listQuery)
require.NoError(t, err)
tc.validate(t, alerts)
})
}
}
func TestIntegration_CompressedAlertRuleStateOperations_NoNormalState(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
ctx := context.Background()
ng, dbstore := tests.SetupTestEnv(
t,
baseIntervalSeconds,
tests.WithFeatureToggles(
featuremgmt.WithFeatures(
featuremgmt.FlagAlertingSaveStateCompressed,
featuremgmt.FlagAlertingNoNormalState,
),
),
)
const mainOrgID int64 = 1
alertRule1 := tests.CreateTestAlertRule(t, ctx, dbstore, 60, mainOrgID)
orgID := alertRule1.OrgID
tests := []struct {
name string
setupInstances func() []models.AlertInstance
listQuery *models.ListAlertInstancesQuery
validate func(t *testing.T, alerts []*models.AlertInstance)
}{
{
name: "should ignore Normal state with no reason if feature flag is enabled",
setupInstances: func() []models.AlertInstance {
return []models.AlertInstance{
createAlertInstance(orgID, util.GenerateShortUID(), util.GenerateShortUID(), "", models.InstanceStateNormal),
createAlertInstance(orgID, util.GenerateShortUID(), "errorHash", "error", models.InstanceStateNormal),
}
},
listQuery: &models.ListAlertInstancesQuery{
RuleOrgID: orgID,
},
validate: func(t *testing.T, alerts []*models.AlertInstance) {
require.Len(t, alerts, 1)
containsHash(t, alerts, "errorHash")
for _, instance := range alerts {
if instance.CurrentState == models.InstanceStateNormal && instance.CurrentReason == "" {
require.Fail(t, "List operation expected to return all states except Normal but the result contains Normal states")
}
}
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
instances := tc.setupInstances()
err := ng.InstanceStore.SaveAlertInstancesForRule(ctx, alertRule1.GetKeyWithGroup(), instances)
require.NoError(t, err)
alerts, err := ng.InstanceStore.ListAlertInstances(ctx, tc.listQuery)
require.NoError(t, err)
tc.validate(t, alerts)
})
}
}
// containsHash is a helper function to check if an instance with
// a given labels hash exists in the list of alert instances.
func containsHash(t *testing.T, instances []*models.AlertInstance, hash string) {
t.Helper()
for _, i := range instances {
if i.LabelsHash == hash {
return
} }
_ = dbstore.DeleteAlertInstances(ctx, keys...) }
require.Fail(t, fmt.Sprintf("%v does not contain an instance with hash %s", instances, hash))
}
func createAlertInstance(orgID int64, ruleUID, labelsHash, reason string, state models.InstanceStateType) models.AlertInstance {
return models.AlertInstance{
AlertInstanceKey: models.AlertInstanceKey{
RuleOrgID: orgID,
RuleUID: ruleUID,
LabelsHash: labelsHash,
},
CurrentState: state,
CurrentReason: reason,
Labels: models.InstanceLabels{"label1": "value1"},
} }
} }
@ -60,20 +191,10 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) {
t.Skip("skipping integration test") t.Skip("skipping integration test")
} }
ctx := context.Background() ctx := context.Background()
_, dbstore := tests.SetupTestEnv(t, baseIntervalSeconds) ng, dbstore := tests.SetupTestEnv(t, baseIntervalSeconds)
const mainOrgID int64 = 1 const mainOrgID int64 = 1
containsHash := func(t *testing.T, instances []*models.AlertInstance, hash string) {
t.Helper()
for _, i := range instances {
if i.LabelsHash == hash {
return
}
}
require.Fail(t, "%v does not contain an instance with hash %s", instances, hash)
}
alertRule1 := tests.CreateTestAlertRule(t, ctx, dbstore, 60, mainOrgID) alertRule1 := tests.CreateTestAlertRule(t, ctx, dbstore, 60, mainOrgID)
orgID := alertRule1.OrgID orgID := alertRule1.OrgID
@ -99,14 +220,14 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) {
CurrentReason: string(models.InstanceStateError), CurrentReason: string(models.InstanceStateError),
Labels: labels, Labels: labels,
} }
err := dbstore.SaveAlertInstance(ctx, instance) err := ng.InstanceStore.SaveAlertInstance(ctx, instance)
require.NoError(t, err) require.NoError(t, err)
listCmd := &models.ListAlertInstancesQuery{ listCmd := &models.ListAlertInstancesQuery{
RuleOrgID: instance.RuleOrgID, RuleOrgID: instance.RuleOrgID,
RuleUID: instance.RuleUID, RuleUID: instance.RuleUID,
} }
alerts, err := dbstore.ListAlertInstances(ctx, listCmd) alerts, err := ng.InstanceStore.ListAlertInstances(ctx, listCmd)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, alerts, 1) require.Len(t, alerts, 1)
@ -128,7 +249,7 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) {
CurrentState: models.InstanceStateNormal, CurrentState: models.InstanceStateNormal,
Labels: labels, Labels: labels,
} }
err := dbstore.SaveAlertInstance(ctx, instance) err := ng.InstanceStore.SaveAlertInstance(ctx, instance)
require.NoError(t, err) require.NoError(t, err)
listCmd := &models.ListAlertInstancesQuery{ listCmd := &models.ListAlertInstancesQuery{
@ -136,7 +257,7 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) {
RuleUID: instance.RuleUID, RuleUID: instance.RuleUID,
} }
alerts, err := dbstore.ListAlertInstances(ctx, listCmd) alerts, err := ng.InstanceStore.ListAlertInstances(ctx, listCmd)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, alerts, 1) require.Len(t, alerts, 1)
@ -158,7 +279,7 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) {
Labels: labels, Labels: labels,
} }
err := dbstore.SaveAlertInstance(ctx, instance1) err := ng.InstanceStore.SaveAlertInstance(ctx, instance1)
require.NoError(t, err) require.NoError(t, err)
labels = models.InstanceLabels{"test": "testValue2"} labels = models.InstanceLabels{"test": "testValue2"}
@ -172,7 +293,7 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) {
CurrentState: models.InstanceStateFiring, CurrentState: models.InstanceStateFiring,
Labels: labels, Labels: labels,
} }
err = dbstore.SaveAlertInstance(ctx, instance2) err = ng.InstanceStore.SaveAlertInstance(ctx, instance2)
require.NoError(t, err) require.NoError(t, err)
listQuery := &models.ListAlertInstancesQuery{ listQuery := &models.ListAlertInstancesQuery{
@ -180,7 +301,7 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) {
RuleUID: instance1.RuleUID, RuleUID: instance1.RuleUID,
} }
alerts, err := dbstore.ListAlertInstances(ctx, listQuery) alerts, err := ng.InstanceStore.ListAlertInstances(ctx, listQuery)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, alerts, 2) require.Len(t, alerts, 2)
@ -191,13 +312,21 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) {
RuleOrgID: orgID, RuleOrgID: orgID,
} }
alerts, err := dbstore.ListAlertInstances(ctx, listQuery) alerts, err := ng.InstanceStore.ListAlertInstances(ctx, listQuery)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, alerts, 4) require.Len(t, alerts, 4)
}) })
t.Run("should ignore Normal state with no reason if feature flag is enabled", func(t *testing.T) { t.Run("should ignore Normal state with no reason if feature flag is enabled", func(t *testing.T) {
ng, _ := tests.SetupTestEnv(
t,
baseIntervalSeconds,
tests.WithFeatureToggles(
featuremgmt.WithFeatures(featuremgmt.FlagAlertingNoNormalState),
),
)
labels := models.InstanceLabels{"test": util.GenerateShortUID()} labels := models.InstanceLabels{"test": util.GenerateShortUID()}
instance1 := models.AlertInstance{ instance1 := models.AlertInstance{
AlertInstanceKey: models.AlertInstanceKey{ AlertInstanceKey: models.AlertInstanceKey{
@ -219,27 +348,16 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) {
CurrentReason: models.StateReasonError, CurrentReason: models.StateReasonError,
Labels: labels, Labels: labels,
} }
err := dbstore.SaveAlertInstance(ctx, instance1) err := ng.InstanceStore.SaveAlertInstance(ctx, instance1)
require.NoError(t, err) require.NoError(t, err)
err = dbstore.SaveAlertInstance(ctx, instance2) err = ng.InstanceStore.SaveAlertInstance(ctx, instance2)
require.NoError(t, err) require.NoError(t, err)
listQuery := &models.ListAlertInstancesQuery{ listQuery := &models.ListAlertInstancesQuery{
RuleOrgID: orgID, RuleOrgID: orgID,
} }
alerts, err := dbstore.ListAlertInstances(ctx, listQuery) alerts, err := ng.InstanceStore.ListAlertInstances(ctx, listQuery)
require.NoError(t, err)
containsHash(t, alerts, instance1.LabelsHash)
f := dbstore.FeatureToggles
dbstore.FeatureToggles = featuremgmt.WithFeatures(featuremgmt.FlagAlertingNoNormalState)
t.Cleanup(func() {
dbstore.FeatureToggles = f
})
alerts, err = dbstore.ListAlertInstances(ctx, listQuery)
require.NoError(t, err) require.NoError(t, err)
containsHash(t, alerts, instance2.LabelsHash) containsHash(t, alerts, instance2.LabelsHash)
@ -264,7 +382,7 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) {
Labels: labels, Labels: labels,
} }
err := dbstore.SaveAlertInstance(ctx, instance1) err := ng.InstanceStore.SaveAlertInstance(ctx, instance1)
require.NoError(t, err) require.NoError(t, err)
instance2 := models.AlertInstance{ instance2 := models.AlertInstance{
@ -276,7 +394,7 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) {
CurrentState: models.InstanceStateNormal, CurrentState: models.InstanceStateNormal,
Labels: instance1.Labels, Labels: instance1.Labels,
} }
err = dbstore.SaveAlertInstance(ctx, instance2) err = ng.InstanceStore.SaveAlertInstance(ctx, instance2)
require.NoError(t, err) require.NoError(t, err)
listQuery := &models.ListAlertInstancesQuery{ listQuery := &models.ListAlertInstancesQuery{
@ -284,7 +402,7 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) {
RuleUID: alertRule4.UID, RuleUID: alertRule4.UID,
} }
alerts, err := dbstore.ListAlertInstances(ctx, listQuery) alerts, err := ng.InstanceStore.ListAlertInstances(ctx, listQuery)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, alerts, 1) require.Len(t, alerts, 1)
@ -300,7 +418,7 @@ func TestIntegrationFullSync(t *testing.T) {
batchSize := 1 batchSize := 1
ctx := context.Background() ctx := context.Background()
_, dbstore := tests.SetupTestEnv(t, baseIntervalSeconds) ng, _ := tests.SetupTestEnv(t, baseIntervalSeconds)
orgID := int64(1) orgID := int64(1)
@ -312,10 +430,10 @@ func TestIntegrationFullSync(t *testing.T) {
} }
t.Run("Should do a proper full sync", func(t *testing.T) { t.Run("Should do a proper full sync", func(t *testing.T) {
err := dbstore.FullSync(ctx, instances, batchSize) err := ng.InstanceStore.FullSync(ctx, instances, batchSize)
require.NoError(t, err) require.NoError(t, err)
res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{ res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
RuleOrgID: orgID, RuleOrgID: orgID,
}) })
require.NoError(t, err) require.NoError(t, err)
@ -335,10 +453,10 @@ func TestIntegrationFullSync(t *testing.T) {
}) })
t.Run("Should remove non existing entries on sync", func(t *testing.T) { t.Run("Should remove non existing entries on sync", func(t *testing.T) {
err := dbstore.FullSync(ctx, instances[1:], batchSize) err := ng.InstanceStore.FullSync(ctx, instances[1:], batchSize)
require.NoError(t, err) require.NoError(t, err)
res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{ res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
RuleOrgID: orgID, RuleOrgID: orgID,
}) })
require.NoError(t, err) require.NoError(t, err)
@ -352,10 +470,10 @@ func TestIntegrationFullSync(t *testing.T) {
t.Run("Should add new entries on sync", func(t *testing.T) { t.Run("Should add new entries on sync", func(t *testing.T) {
newRuleUID := "y" newRuleUID := "y"
err := dbstore.FullSync(ctx, append(instances, generateTestAlertInstance(orgID, newRuleUID)), batchSize) err := ng.InstanceStore.FullSync(ctx, append(instances, generateTestAlertInstance(orgID, newRuleUID)), batchSize)
require.NoError(t, err) require.NoError(t, err)
res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{ res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
RuleOrgID: orgID, RuleOrgID: orgID,
}) })
require.NoError(t, err) require.NoError(t, err)
@ -377,10 +495,10 @@ func TestIntegrationFullSync(t *testing.T) {
t.Run("Should save all instances when batch size is bigger than 1", func(t *testing.T) { t.Run("Should save all instances when batch size is bigger than 1", func(t *testing.T) {
batchSize = 2 batchSize = 2
newRuleUID := "y" newRuleUID := "y"
err := dbstore.FullSync(ctx, append(instances, generateTestAlertInstance(orgID, newRuleUID)), batchSize) err := ng.InstanceStore.FullSync(ctx, append(instances, generateTestAlertInstance(orgID, newRuleUID)), batchSize)
require.NoError(t, err) require.NoError(t, err)
res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{ res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
RuleOrgID: orgID, RuleOrgID: orgID,
}) })
require.NoError(t, err) require.NoError(t, err)
@ -405,16 +523,16 @@ func TestIntegrationFullSync(t *testing.T) {
generateTestAlertInstance(orgID, "preexisting-1"), generateTestAlertInstance(orgID, "preexisting-1"),
generateTestAlertInstance(orgID, "preexisting-2"), generateTestAlertInstance(orgID, "preexisting-2"),
} }
err := dbstore.FullSync(ctx, initialInstances, 5) err := ng.InstanceStore.FullSync(ctx, initialInstances, 5)
require.NoError(t, err) require.NoError(t, err)
// Now call FullSync with no instances. According to the code, this should return nil // Now call FullSync with no instances. According to the code, this should return nil
// and should not delete anything in the table. // and should not delete anything in the table.
err = dbstore.FullSync(ctx, []models.AlertInstance{}, 5) err = ng.InstanceStore.FullSync(ctx, []models.AlertInstance{}, 5)
require.NoError(t, err) require.NoError(t, err)
// Check that the previously inserted instances are still present. // Check that the previously inserted instances are still present.
res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{ res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
RuleOrgID: orgID, RuleOrgID: orgID,
}) })
require.NoError(t, err) require.NoError(t, err)
@ -441,11 +559,11 @@ func TestIntegrationFullSync(t *testing.T) {
// Make the invalid instance actually invalid // Make the invalid instance actually invalid
invalidInstance.AlertInstanceKey.RuleUID = "" invalidInstance.AlertInstanceKey.RuleUID = ""
err := dbstore.FullSync(ctx, []models.AlertInstance{validInstance, invalidInstance}, 2) err := ng.InstanceStore.FullSync(ctx, []models.AlertInstance{validInstance, invalidInstance}, 2)
require.NoError(t, err) require.NoError(t, err)
// Only the valid instance should be saved. // Only the valid instance should be saved.
res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{ res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
RuleOrgID: orgID, RuleOrgID: orgID,
}) })
require.NoError(t, err) require.NoError(t, err)
@ -460,10 +578,10 @@ func TestIntegrationFullSync(t *testing.T) {
generateTestAlertInstance(orgID, "batch-test2"), generateTestAlertInstance(orgID, "batch-test2"),
} }
err := dbstore.FullSync(ctx, smallSet, 100) err := ng.InstanceStore.FullSync(ctx, smallSet, 100)
require.NoError(t, err) require.NoError(t, err)
res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{ res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
RuleOrgID: orgID, RuleOrgID: orgID,
}) })
require.NoError(t, err) require.NoError(t, err)
@ -483,7 +601,7 @@ func TestIntegrationFullSync(t *testing.T) {
t.Run("Should handle a large set of instances with a moderate batchSize", func(t *testing.T) { t.Run("Should handle a large set of instances with a moderate batchSize", func(t *testing.T) {
// Clear everything first. // Clear everything first.
err := dbstore.FullSync(ctx, []models.AlertInstance{}, 1) err := ng.InstanceStore.FullSync(ctx, []models.AlertInstance{}, 1)
require.NoError(t, err) require.NoError(t, err)
largeCount := 300 largeCount := 300
@ -492,10 +610,10 @@ func TestIntegrationFullSync(t *testing.T) {
largeSet[i] = generateTestAlertInstance(orgID, fmt.Sprintf("large-%d", i)) largeSet[i] = generateTestAlertInstance(orgID, fmt.Sprintf("large-%d", i))
} }
err = dbstore.FullSync(ctx, largeSet, 50) err = ng.InstanceStore.FullSync(ctx, largeSet, 50)
require.NoError(t, err) require.NoError(t, err)
res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{ res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
RuleOrgID: orgID, RuleOrgID: orgID,
}) })
require.NoError(t, err) require.NoError(t, err)
@ -503,6 +621,77 @@ func TestIntegrationFullSync(t *testing.T) {
}) })
} }
func TestIntegration_ProtoInstanceDBStore_VerifyCompressedData(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
ctx := context.Background()
ng, dbstore := tests.SetupTestEnv(
t,
baseIntervalSeconds,
tests.WithFeatureToggles(
featuremgmt.WithFeatures(
featuremgmt.FlagAlertingSaveStateCompressed,
),
),
)
alertRule := tests.CreateTestAlertRule(t, ctx, dbstore, 60, 1)
labelsHash := "hash1"
reason := "reason"
state := models.InstanceStateFiring
instances := []models.AlertInstance{
createAlertInstance(alertRule.OrgID, alertRule.UID, labelsHash, reason, state),
}
err := ng.InstanceStore.SaveAlertInstancesForRule(ctx, alertRule.GetKeyWithGroup(), instances)
require.NoError(t, err)
// Query raw data from the database
type compressedRow struct {
OrgID int64 `xorm:"org_id"`
RuleUID string `xorm:"rule_uid"`
Data []byte `xorm:"data"`
}
var rawData compressedRow
err = dbstore.SQLStore.WithDbSession(ctx, func(sess *db.Session) error {
_, err := sess.SQL("SELECT * FROM alert_rule_state").Get(&rawData)
return err
})
require.NoError(t, err)
// Decompress and compare
require.NotNil(t, rawData)
decompressedInstances, err := decompressAlertInstances(rawData.Data)
require.NoError(t, err)
require.Len(t, decompressedInstances, 1)
require.Equal(t, instances[0].LabelsHash, decompressedInstances[0].LabelsHash)
require.Equal(t, string(instances[0].CurrentState), decompressedInstances[0].CurrentState)
require.Equal(t, instances[0].CurrentReason, decompressedInstances[0].CurrentReason)
}
func decompressAlertInstances(compressed []byte) ([]*pb.AlertInstance, error) {
if len(compressed) == 0 {
return nil, nil
}
reader := snappy.NewReader(bytes.NewReader(compressed))
var b bytes.Buffer
if _, err := b.ReadFrom(reader); err != nil {
return nil, fmt.Errorf("failed to read compressed data: %w", err)
}
var instances pb.AlertInstances
if err := proto.Unmarshal(b.Bytes(), &instances); err != nil {
return nil, fmt.Errorf("failed to unmarshal protobuf: %w", err)
}
return instances.Instances, nil
}
func generateTestAlertInstance(orgID int64, ruleID string) models.AlertInstance { func generateTestAlertInstance(orgID int64, ruleID string) models.AlertInstance {
return models.AlertInstance{ return models.AlertInstance{
AlertInstanceKey: models.AlertInstanceKey{ AlertInstanceKey: models.AlertInstanceKey{

View File

@ -0,0 +1,301 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.1
// protoc (unknown)
// source: alert_rule_state.proto
package v1
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type AlertInstance struct {
state protoimpl.MessageState `protogen:"open.v1"`
LabelsHash string `protobuf:"bytes,1,opt,name=labels_hash,json=labelsHash,proto3" json:"labels_hash,omitempty"`
Labels map[string]string `protobuf:"bytes,2,rep,name=labels,proto3" json:"labels,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
CurrentState string `protobuf:"bytes,3,opt,name=current_state,json=currentState,proto3" json:"current_state,omitempty"`
CurrentReason string `protobuf:"bytes,4,opt,name=current_reason,json=currentReason,proto3" json:"current_reason,omitempty"`
CurrentStateSince *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=current_state_since,json=currentStateSince,proto3" json:"current_state_since,omitempty"`
CurrentStateEnd *timestamppb.Timestamp `protobuf:"bytes,6,opt,name=current_state_end,json=currentStateEnd,proto3" json:"current_state_end,omitempty"`
LastEvalTime *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=last_eval_time,json=lastEvalTime,proto3" json:"last_eval_time,omitempty"`
LastSentAt *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=last_sent_at,json=lastSentAt,proto3" json:"last_sent_at,omitempty"`
ResolvedAt *timestamppb.Timestamp `protobuf:"bytes,9,opt,name=resolved_at,json=resolvedAt,proto3" json:"resolved_at,omitempty"`
ResultFingerprint string `protobuf:"bytes,10,opt,name=result_fingerprint,json=resultFingerprint,proto3" json:"result_fingerprint,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *AlertInstance) Reset() {
*x = AlertInstance{}
mi := &file_alert_rule_state_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *AlertInstance) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*AlertInstance) ProtoMessage() {}
func (x *AlertInstance) ProtoReflect() protoreflect.Message {
mi := &file_alert_rule_state_proto_msgTypes[0]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use AlertInstance.ProtoReflect.Descriptor instead.
func (*AlertInstance) Descriptor() ([]byte, []int) {
return file_alert_rule_state_proto_rawDescGZIP(), []int{0}
}
func (x *AlertInstance) GetLabelsHash() string {
if x != nil {
return x.LabelsHash
}
return ""
}
func (x *AlertInstance) GetLabels() map[string]string {
if x != nil {
return x.Labels
}
return nil
}
func (x *AlertInstance) GetCurrentState() string {
if x != nil {
return x.CurrentState
}
return ""
}
func (x *AlertInstance) GetCurrentReason() string {
if x != nil {
return x.CurrentReason
}
return ""
}
func (x *AlertInstance) GetCurrentStateSince() *timestamppb.Timestamp {
if x != nil {
return x.CurrentStateSince
}
return nil
}
func (x *AlertInstance) GetCurrentStateEnd() *timestamppb.Timestamp {
if x != nil {
return x.CurrentStateEnd
}
return nil
}
func (x *AlertInstance) GetLastEvalTime() *timestamppb.Timestamp {
if x != nil {
return x.LastEvalTime
}
return nil
}
func (x *AlertInstance) GetLastSentAt() *timestamppb.Timestamp {
if x != nil {
return x.LastSentAt
}
return nil
}
func (x *AlertInstance) GetResolvedAt() *timestamppb.Timestamp {
if x != nil {
return x.ResolvedAt
}
return nil
}
func (x *AlertInstance) GetResultFingerprint() string {
if x != nil {
return x.ResultFingerprint
}
return ""
}
type AlertInstances struct {
state protoimpl.MessageState `protogen:"open.v1"`
Instances []*AlertInstance `protobuf:"bytes,1,rep,name=instances,proto3" json:"instances,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *AlertInstances) Reset() {
*x = AlertInstances{}
mi := &file_alert_rule_state_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *AlertInstances) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*AlertInstances) ProtoMessage() {}
func (x *AlertInstances) ProtoReflect() protoreflect.Message {
mi := &file_alert_rule_state_proto_msgTypes[1]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use AlertInstances.ProtoReflect.Descriptor instead.
func (*AlertInstances) Descriptor() ([]byte, []int) {
return file_alert_rule_state_proto_rawDescGZIP(), []int{1}
}
func (x *AlertInstances) GetInstances() []*AlertInstance {
if x != nil {
return x.Instances
}
return nil
}
var File_alert_rule_state_proto protoreflect.FileDescriptor
var file_alert_rule_state_proto_rawDesc = []byte{
0x0a, 0x16, 0x61, 0x6c, 0x65, 0x72, 0x74, 0x5f, 0x72, 0x75, 0x6c, 0x65, 0x5f, 0x73, 0x74, 0x61,
0x74, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x10, 0x6e, 0x67, 0x61, 0x6c, 0x65, 0x72,
0x74, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67,
0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65,
0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xfc, 0x04, 0x0a, 0x0d,
0x41, 0x6c, 0x65, 0x72, 0x74, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x12, 0x1f, 0x0a,
0x0b, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x0a, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x48, 0x61, 0x73, 0x68, 0x12, 0x43,
0x0a, 0x06, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2b,
0x2e, 0x6e, 0x67, 0x61, 0x6c, 0x65, 0x72, 0x74, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76,
0x31, 0x2e, 0x41, 0x6c, 0x65, 0x72, 0x74, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x2e,
0x4c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x06, 0x6c, 0x61, 0x62,
0x65, 0x6c, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x73,
0x74, 0x61, 0x74, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x63, 0x75, 0x72, 0x72,
0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x75, 0x72, 0x72,
0x65, 0x6e, 0x74, 0x5f, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09,
0x52, 0x0d, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x12,
0x4a, 0x0a, 0x13, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x65,
0x5f, 0x73, 0x69, 0x6e, 0x63, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67,
0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54,
0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x11, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e,
0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x53, 0x69, 0x6e, 0x63, 0x65, 0x12, 0x46, 0x0a, 0x11, 0x63,
0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x65, 0x5f, 0x65, 0x6e, 0x64,
0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61,
0x6d, 0x70, 0x52, 0x0f, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65,
0x45, 0x6e, 0x64, 0x12, 0x40, 0x0a, 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x65, 0x76, 0x61, 0x6c,
0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f,
0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69,
0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0c, 0x6c, 0x61, 0x73, 0x74, 0x45, 0x76, 0x61,
0x6c, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x3c, 0x0a, 0x0c, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x73, 0x65,
0x6e, 0x74, 0x5f, 0x61, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f,
0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69,
0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0a, 0x6c, 0x61, 0x73, 0x74, 0x53, 0x65, 0x6e,
0x74, 0x41, 0x74, 0x12, 0x3b, 0x0a, 0x0b, 0x72, 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x64, 0x5f,
0x61, 0x74, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73,
0x74, 0x61, 0x6d, 0x70, 0x52, 0x0a, 0x72, 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x64, 0x41, 0x74,
0x12, 0x2d, 0x0a, 0x12, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x5f, 0x66, 0x69, 0x6e, 0x67, 0x65,
0x72, 0x70, 0x72, 0x69, 0x6e, 0x74, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x11, 0x72, 0x65,
0x73, 0x75, 0x6c, 0x74, 0x46, 0x69, 0x6e, 0x67, 0x65, 0x72, 0x70, 0x72, 0x69, 0x6e, 0x74, 0x1a,
0x39, 0x0a, 0x0b, 0x4c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10,
0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79,
0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x4f, 0x0a, 0x0e, 0x41, 0x6c,
0x65, 0x72, 0x74, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x73, 0x12, 0x3d, 0x0a, 0x09,
0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32,
0x1f, 0x2e, 0x6e, 0x67, 0x61, 0x6c, 0x65, 0x72, 0x74, 0x2e, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e,
0x76, 0x31, 0x2e, 0x41, 0x6c, 0x65, 0x72, 0x74, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65,
0x52, 0x09, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x73, 0x42, 0x40, 0x5a, 0x3e, 0x67,
0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x72, 0x61, 0x66, 0x61, 0x6e,
0x61, 0x2f, 0x67, 0x72, 0x61, 0x66, 0x61, 0x6e, 0x61, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x73, 0x65,
0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x6e, 0x67, 0x61, 0x6c, 0x65, 0x72, 0x74, 0x2f, 0x73,
0x74, 0x6f, 0x72, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_alert_rule_state_proto_rawDescOnce sync.Once
file_alert_rule_state_proto_rawDescData = file_alert_rule_state_proto_rawDesc
)
func file_alert_rule_state_proto_rawDescGZIP() []byte {
file_alert_rule_state_proto_rawDescOnce.Do(func() {
file_alert_rule_state_proto_rawDescData = protoimpl.X.CompressGZIP(file_alert_rule_state_proto_rawDescData)
})
return file_alert_rule_state_proto_rawDescData
}
var file_alert_rule_state_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_alert_rule_state_proto_goTypes = []any{
(*AlertInstance)(nil), // 0: ngalert.store.v1.AlertInstance
(*AlertInstances)(nil), // 1: ngalert.store.v1.AlertInstances
nil, // 2: ngalert.store.v1.AlertInstance.LabelsEntry
(*timestamppb.Timestamp)(nil), // 3: google.protobuf.Timestamp
}
var file_alert_rule_state_proto_depIdxs = []int32{
2, // 0: ngalert.store.v1.AlertInstance.labels:type_name -> ngalert.store.v1.AlertInstance.LabelsEntry
3, // 1: ngalert.store.v1.AlertInstance.current_state_since:type_name -> google.protobuf.Timestamp
3, // 2: ngalert.store.v1.AlertInstance.current_state_end:type_name -> google.protobuf.Timestamp
3, // 3: ngalert.store.v1.AlertInstance.last_eval_time:type_name -> google.protobuf.Timestamp
3, // 4: ngalert.store.v1.AlertInstance.last_sent_at:type_name -> google.protobuf.Timestamp
3, // 5: ngalert.store.v1.AlertInstance.resolved_at:type_name -> google.protobuf.Timestamp
0, // 6: ngalert.store.v1.AlertInstances.instances:type_name -> ngalert.store.v1.AlertInstance
7, // [7:7] is the sub-list for method output_type
7, // [7:7] is the sub-list for method input_type
7, // [7:7] is the sub-list for extension type_name
7, // [7:7] is the sub-list for extension extendee
0, // [0:7] is the sub-list for field type_name
}
func init() { file_alert_rule_state_proto_init() }
func file_alert_rule_state_proto_init() {
if File_alert_rule_state_proto != nil {
return
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_alert_rule_state_proto_rawDesc,
NumEnums: 0,
NumMessages: 3,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_alert_rule_state_proto_goTypes,
DependencyIndexes: file_alert_rule_state_proto_depIdxs,
MessageInfos: file_alert_rule_state_proto_msgTypes,
}.Build()
File_alert_rule_state_proto = out.File
file_alert_rule_state_proto_rawDesc = nil
file_alert_rule_state_proto_goTypes = nil
file_alert_rule_state_proto_depIdxs = nil
}

View File

@ -0,0 +1,24 @@
syntax = "proto3";
package ngalert.store.v1;
import "google/protobuf/timestamp.proto";
option go_package = "github.com/grafana/grafana/pkg/services/ngalert/store/proto/v1";
message AlertInstance {
string labels_hash = 1;
map<string, string> labels = 2;
string current_state = 3;
string current_reason = 4;
google.protobuf.Timestamp current_state_since = 5;
google.protobuf.Timestamp current_state_end = 6;
google.protobuf.Timestamp last_eval_time = 7;
google.protobuf.Timestamp last_sent_at = 8;
google.protobuf.Timestamp resolved_at = 9;
string result_fingerprint = 10;
}
message AlertInstances {
repeated AlertInstance instances = 1;
}

View File

@ -0,0 +1,5 @@
version: v1
plugins:
- plugin: go
out: pkg/services/ngalert/store/proto/v1
opt: paths=source_relative

View File

@ -0,0 +1,7 @@
version: v2
lint:
use:
- DEFAULT
breaking:
use:
- FILE

View File

@ -0,0 +1,261 @@
package store
import (
"bytes"
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/golang/snappy"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/ngalert/models"
pb "github.com/grafana/grafana/pkg/services/ngalert/store/proto/v1"
)
// ProtoInstanceDBStore is a store for alert instances that stores state of a rule as a single
// row in the database with alert instances as a compressed protobuf message.
type ProtoInstanceDBStore struct {
SQLStore db.DB
Logger log.Logger
FeatureToggles featuremgmt.FeatureToggles
}
func (st ProtoInstanceDBStore) ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) (result []*models.AlertInstance, err error) {
logger := st.Logger.FromContext(ctx)
logger.Debug("ListAlertInstances called", "rule_uid", cmd.RuleUID, "org_id", cmd.RuleOrgID)
alertInstances := make([]*models.AlertInstance, 0)
err = st.SQLStore.WithDbSession(ctx, func(sess *db.Session) error {
s := strings.Builder{}
params := make([]any, 0)
addToQuery := func(stmt string, p ...any) {
s.WriteString(stmt)
params = append(params, p...)
}
addToQuery("SELECT * FROM alert_rule_state WHERE org_id = ?", cmd.RuleOrgID)
if cmd.RuleUID != "" {
addToQuery(" AND rule_uid = ?", cmd.RuleUID)
}
// Execute query to get compressed instances
type compressedRow struct {
OrgID int64 `xorm:"org_id"`
RuleUID string `xorm:"rule_uid"`
Data []byte `xorm:"data"`
}
rows := make([]compressedRow, 0)
if err := sess.SQL(s.String(), params...).Find(&rows); err != nil {
return fmt.Errorf("failed to query alert_rule_state: %w", err)
}
for _, row := range rows {
instances, err := decompressAlertInstances(row.Data)
if err != nil {
return fmt.Errorf("failed to decompress alert instances for rule %s: %w", row.RuleUID, err)
}
// Convert proto instances to model instances
for _, protoInstance := range instances {
modelInstance := alertInstanceProtoToModel(row.RuleUID, row.OrgID, protoInstance)
if modelInstance != nil {
// If FlagAlertingNoNormalState is enabled, we should not return instances with normal state and no reason.
if st.FeatureToggles.IsEnabled(ctx, featuremgmt.FlagAlertingNoNormalState) {
if modelInstance.CurrentState == models.InstanceStateNormal && modelInstance.CurrentReason == "" {
continue
}
}
alertInstances = append(alertInstances, modelInstance)
}
}
}
return nil
})
logger.Debug("ListAlertInstances completed", "instances", len(alertInstances))
return alertInstances, err
}
func (st ProtoInstanceDBStore) SaveAlertInstance(ctx context.Context, alertInstance models.AlertInstance) error {
st.Logger.Error("SaveAlertInstance called and not implemented")
return errors.New("save alert instance is not implemented for proto instance database store")
}
func (st ProtoInstanceDBStore) FetchOrgIds(ctx context.Context) ([]int64, error) {
orgIds := []int64{}
err := st.SQLStore.WithDbSession(ctx, func(sess *db.Session) error {
s := strings.Builder{}
params := make([]any, 0)
addToQuery := func(stmt string, p ...any) {
s.WriteString(stmt)
params = append(params, p...)
}
addToQuery("SELECT DISTINCT org_id FROM alert_rule_state")
if err := sess.SQL(s.String(), params...).Find(&orgIds); err != nil {
return err
}
return nil
})
return orgIds, err
}
func (st ProtoInstanceDBStore) DeleteAlertInstances(ctx context.Context, keys ...models.AlertInstanceKey) error {
logger := st.Logger.FromContext(ctx)
logger.Error("DeleteAlertInstances called and not implemented")
return errors.New("delete alert instances is not implemented for proto instance database store")
}
func (st ProtoInstanceDBStore) SaveAlertInstancesForRule(ctx context.Context, key models.AlertRuleKeyWithGroup, instances []models.AlertInstance) error {
logger := st.Logger.FromContext(ctx)
logger.Debug("SaveAlertInstancesForRule called", "rule_uid", key.UID, "org_id", key.OrgID, "instances", len(instances))
alert_instances_proto := make([]*pb.AlertInstance, len(instances))
for i, instance := range instances {
alert_instances_proto[i] = alertInstanceModelToProto(instance)
}
compressedAlertInstances, err := compressAlertInstances(alert_instances_proto)
if err != nil {
return fmt.Errorf("failed to compress alert instances: %w", err)
}
return st.SQLStore.WithTransactionalDbSession(ctx, func(sess *db.Session) error {
params := []any{key.OrgID, key.UID, compressedAlertInstances, time.Now()}
upsertSQL := st.SQLStore.GetDialect().UpsertSQL(
"alert_rule_state",
[]string{"org_id", "rule_uid"},
[]string{"org_id", "rule_uid", "data", "updated_at"},
)
_, err = sess.SQL(upsertSQL, params...).Query()
return err
})
}
func (st ProtoInstanceDBStore) DeleteAlertInstancesByRule(ctx context.Context, key models.AlertRuleKeyWithGroup) error {
logger := st.Logger.FromContext(ctx)
logger.Debug("DeleteAlertInstancesByRule called", "rule_uid", key.UID, "org_id", key.OrgID)
return st.SQLStore.WithTransactionalDbSession(ctx, func(sess *db.Session) error {
_, err := sess.Exec("DELETE FROM alert_rule_state WHERE org_id = ? AND rule_uid = ?", key.OrgID, key.UID)
return err
})
}
func (st ProtoInstanceDBStore) FullSync(ctx context.Context, instances []models.AlertInstance, batchSize int) error {
logger := st.Logger.FromContext(ctx)
logger.Error("FullSync called and not implemented")
return errors.New("fullsync is not implemented for proto instance database store")
}
func alertInstanceModelToProto(modelInstance models.AlertInstance) *pb.AlertInstance {
return &pb.AlertInstance{
Labels: modelInstance.Labels,
LabelsHash: modelInstance.LabelsHash,
CurrentState: string(modelInstance.CurrentState),
CurrentStateSince: timestamppb.New(modelInstance.CurrentStateSince),
CurrentStateEnd: timestamppb.New(modelInstance.CurrentStateEnd),
CurrentReason: modelInstance.CurrentReason,
LastEvalTime: timestamppb.New(modelInstance.LastEvalTime),
LastSentAt: nullableTimeToTimestamp(modelInstance.LastSentAt),
ResolvedAt: nullableTimeToTimestamp(modelInstance.ResolvedAt),
ResultFingerprint: modelInstance.ResultFingerprint,
}
}
func compressAlertInstances(instances []*pb.AlertInstance) ([]byte, error) {
mProto, err := proto.Marshal(&pb.AlertInstances{Instances: instances})
if err != nil {
return nil, fmt.Errorf("failed to marshal protobuf: %w", err)
}
var b bytes.Buffer
writer := snappy.NewBufferedWriter(&b)
if _, err := writer.Write(mProto); err != nil {
return nil, fmt.Errorf("failed to write compressed data: %w", err)
}
if err := writer.Close(); err != nil {
return nil, fmt.Errorf("failed to close snappy writer: %w", err)
}
return b.Bytes(), nil
}
func alertInstanceProtoToModel(ruleUID string, ruleOrgID int64, protoInstance *pb.AlertInstance) *models.AlertInstance {
if protoInstance == nil {
return nil
}
return &models.AlertInstance{
AlertInstanceKey: models.AlertInstanceKey{
RuleOrgID: ruleOrgID,
RuleUID: ruleUID,
LabelsHash: protoInstance.LabelsHash,
},
Labels: protoInstance.Labels,
CurrentState: models.InstanceStateType(protoInstance.CurrentState),
CurrentStateSince: protoInstance.CurrentStateSince.AsTime(),
CurrentStateEnd: protoInstance.CurrentStateEnd.AsTime(),
CurrentReason: protoInstance.CurrentReason,
LastEvalTime: protoInstance.LastEvalTime.AsTime(),
LastSentAt: nullableTimestampToTime(protoInstance.LastSentAt),
ResolvedAt: nullableTimestampToTime(protoInstance.ResolvedAt),
ResultFingerprint: protoInstance.ResultFingerprint,
}
}
func decompressAlertInstances(compressed []byte) ([]*pb.AlertInstance, error) {
if len(compressed) == 0 {
return nil, nil
}
reader := snappy.NewReader(bytes.NewReader(compressed))
var b bytes.Buffer
if _, err := b.ReadFrom(reader); err != nil {
return nil, fmt.Errorf("failed to read compressed data: %w", err)
}
var instances pb.AlertInstances
if err := proto.Unmarshal(b.Bytes(), &instances); err != nil {
return nil, fmt.Errorf("failed to unmarshal protobuf: %w", err)
}
return instances.Instances, nil
}
// nullableTimeToTimestamp converts a nullable time.Time to nil, if it is nil, otherwise it converts to timestamppb.Timestamp.
func nullableTimeToTimestamp(t *time.Time) *timestamppb.Timestamp {
if t == nil {
return nil
}
return timestamppb.New(*t)
}
// nullableTimestampToTime converts a nullable timestamppb.Timestamp to nil, if it is nil, otherwise it converts to time.Time.
func nullableTimestampToTime(ts *timestamppb.Timestamp) *time.Time {
if ts == nil {
return nil
}
t := ts.AsTime()
return &t
}

View File

@ -0,0 +1,176 @@
package store
import (
"reflect"
"testing"
"time"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/grafana/grafana/pkg/services/ngalert/models"
pb "github.com/grafana/grafana/pkg/services/ngalert/store/proto/v1"
)
func TestAlertInstanceModelToProto(t *testing.T) {
currentStateSince := time.Now()
currentStateEnd := currentStateSince.Add(time.Minute)
lastEvalTime := currentStateSince.Add(-time.Minute)
lastSentAt := currentStateSince.Add(-2 * time.Minute)
resolvedAt := currentStateSince.Add(-3 * time.Minute)
tests := []struct {
name string
input models.AlertInstance
expected *pb.AlertInstance
}{
{
name: "valid instance",
input: models.AlertInstance{
Labels: map[string]string{"key": "value"},
AlertInstanceKey: models.AlertInstanceKey{
RuleUID: "rule-uid-1",
RuleOrgID: 1,
LabelsHash: "hash123",
},
CurrentState: models.InstanceStateFiring,
CurrentStateSince: currentStateSince,
CurrentStateEnd: currentStateEnd,
CurrentReason: "Some reason",
LastEvalTime: lastEvalTime,
LastSentAt: &lastSentAt,
ResolvedAt: &resolvedAt,
ResultFingerprint: "fingerprint",
},
expected: &pb.AlertInstance{
Labels: map[string]string{"key": "value"},
LabelsHash: "hash123",
CurrentState: "Alerting",
CurrentStateSince: timestamppb.New(currentStateSince),
CurrentStateEnd: timestamppb.New(currentStateEnd),
CurrentReason: "Some reason",
LastEvalTime: timestamppb.New(lastEvalTime),
LastSentAt: toProtoTimestampPtr(&lastSentAt),
ResolvedAt: toProtoTimestampPtr(&resolvedAt),
ResultFingerprint: "fingerprint",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := alertInstanceModelToProto(tt.input)
require.Equal(t, tt.expected, result)
})
}
}
func TestAlertInstanceProtoToModel(t *testing.T) {
currentStateSince := time.Now().UTC()
currentStateEnd := currentStateSince.Add(time.Minute).UTC()
lastEvalTime := currentStateSince.Add(-time.Minute).UTC()
lastSentAt := currentStateSince.Add(-2 * time.Minute).UTC()
resolvedAt := currentStateSince.Add(-3 * time.Minute).UTC()
ruleUID := "rule-uid-1"
orgID := int64(1)
tests := []struct {
name string
input *pb.AlertInstance
expected *models.AlertInstance
}{
{
name: "valid instance",
input: &pb.AlertInstance{
Labels: map[string]string{"key": "value"},
LabelsHash: "hash123",
CurrentState: "Alerting",
CurrentStateSince: timestamppb.New(currentStateSince),
CurrentStateEnd: timestamppb.New(currentStateEnd),
LastEvalTime: timestamppb.New(lastEvalTime),
LastSentAt: toProtoTimestampPtr(&lastSentAt),
ResolvedAt: toProtoTimestampPtr(&resolvedAt),
ResultFingerprint: "fingerprint",
},
expected: &models.AlertInstance{
Labels: map[string]string{"key": "value"},
AlertInstanceKey: models.AlertInstanceKey{
RuleUID: ruleUID,
RuleOrgID: orgID,
LabelsHash: "hash123",
},
CurrentState: models.InstanceStateFiring,
CurrentStateSince: currentStateSince,
CurrentStateEnd: currentStateEnd,
LastEvalTime: lastEvalTime,
LastSentAt: &lastSentAt,
ResolvedAt: &resolvedAt,
ResultFingerprint: "fingerprint",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := alertInstanceProtoToModel(ruleUID, orgID, tt.input)
require.Equal(t, tt.expected, result)
})
}
}
func TestModelAlertInstanceMatchesProtobuf(t *testing.T) {
// The AlertInstance protobuf must always contain the same information
// as the model, so that it's preserved between the Grafana restarts.
//
// If the AlertInstance model changes, review the protobuf and the test
// and update them accordingly.
t.Run("when AlertInstance model changes", func(t *testing.T) {
modelType := reflect.TypeOf(models.AlertInstance{})
require.Equal(t, 10, modelType.NumField(), "AlertInstance model has changed, update the protobuf")
})
}
func TestCompressAndDecompressAlertInstances(t *testing.T) {
now := time.Now()
alertInstances := []*pb.AlertInstance{
{
Labels: map[string]string{"label-1": "value-1"},
LabelsHash: "hash-1",
CurrentState: "normal",
CurrentStateSince: timestamppb.New(now),
CurrentStateEnd: timestamppb.New(now.Add(time.Hour)),
CurrentReason: "reason-1",
LastEvalTime: timestamppb.New(now.Add(-time.Minute)),
ResolvedAt: timestamppb.New(now.Add(time.Hour * 2)),
ResultFingerprint: "fingerprint-1",
},
{
Labels: map[string]string{"label-2": "value-2"},
LabelsHash: "hash-2",
CurrentState: "firing",
CurrentStateSince: timestamppb.New(now),
CurrentReason: "reason-2",
LastEvalTime: timestamppb.New(now.Add(-time.Minute * 2)),
},
}
compressedData, err := compressAlertInstances(alertInstances)
require.NoError(t, err)
decompressedInstances, err := decompressAlertInstances(compressedData)
require.NoError(t, err)
// Compare the original and decompressed instances
require.Equal(t, len(alertInstances), len(decompressedInstances))
require.EqualExportedValues(t, alertInstances[0], decompressedInstances[0])
require.EqualExportedValues(t, alertInstances[1], decompressedInstances[1])
}
func toProtoTimestampPtr(tm *time.Time) *timestamppb.Timestamp {
if tm == nil {
return nil
}
return timestamppb.New(*tm)
}

View File

@ -41,10 +41,30 @@ import (
"github.com/grafana/grafana/pkg/util" "github.com/grafana/grafana/pkg/util"
) )
type TestEnvOptions struct {
featureToggles featuremgmt.FeatureToggles
}
type TestEnvOption func(*TestEnvOptions)
func WithFeatureToggles(toggles featuremgmt.FeatureToggles) TestEnvOption {
return func(opts *TestEnvOptions) {
opts.featureToggles = toggles
}
}
// SetupTestEnv initializes a store to used by the tests. // SetupTestEnv initializes a store to used by the tests.
func SetupTestEnv(tb testing.TB, baseInterval time.Duration) (*ngalert.AlertNG, *store.DBstore) { func SetupTestEnv(tb testing.TB, baseInterval time.Duration, opts ...TestEnvOption) (*ngalert.AlertNG, *store.DBstore) {
tb.Helper() tb.Helper()
options := TestEnvOptions{
featureToggles: featuremgmt.WithFeatures(),
}
for _, opt := range opts {
opt(&options)
}
cfg := setting.NewCfg() cfg := setting.NewCfg()
cfg.UnifiedAlerting = setting.UnifiedAlertingSettings{ cfg.UnifiedAlerting = setting.UnifiedAlertingSettings{
BaseInterval: setting.SchedulerBaseInterval, BaseInterval: setting.SchedulerBaseInterval,
@ -64,18 +84,18 @@ func SetupTestEnv(tb testing.TB, baseInterval time.Duration) (*ngalert.AlertNG,
bus := bus.ProvideBus(tracer) bus := bus.ProvideBus(tracer)
folderStore := folderimpl.ProvideDashboardFolderStore(sqlStore) folderStore := folderimpl.ProvideDashboardFolderStore(sqlStore)
dashboardService, dashboardStore := testutil.SetupDashboardService(tb, sqlStore, folderStore, cfg) dashboardService, dashboardStore := testutil.SetupDashboardService(tb, sqlStore, folderStore, cfg)
features := featuremgmt.WithFeatures() folderService := testutil.SetupFolderService(tb, cfg, sqlStore, dashboardStore, folderStore, bus, options.featureToggles, ac)
folderService := testutil.SetupFolderService(tb, cfg, sqlStore, dashboardStore, folderStore, bus, features, ac) ruleStore, err := store.ProvideDBStore(cfg, options.featureToggles, sqlStore, folderService, &dashboards.FakeDashboardService{}, ac, bus)
ruleStore, err := store.ProvideDBStore(cfg, featuremgmt.WithFeatures(), sqlStore, folderService, &dashboards.FakeDashboardService{}, ac, bus)
require.NoError(tb, err) require.NoError(tb, err)
ng, err := ngalert.ProvideService( ng, err := ngalert.ProvideService(
cfg, features, nil, nil, routing.NewRouteRegister(), sqlStore, kvstore.NewFakeKVStore(), nil, nil, quotatest.New(false, nil), cfg, options.featureToggles, nil, nil, routing.NewRouteRegister(), sqlStore, kvstore.NewFakeKVStore(), nil, nil, quotatest.New(false, nil),
secretsService, nil, m, folderService, ac, &dashboards.FakeDashboardService{}, nil, bus, ac, secretsService, nil, m, folderService, ac, &dashboards.FakeDashboardService{}, nil, bus, ac,
annotationstest.NewFakeAnnotationsRepo(), &pluginstore.FakePluginStore{}, tracer, ruleStore, httpclient.NewProvider(), ngalertfakes.NewFakeReceiverPermissionsService(), annotationstest.NewFakeAnnotationsRepo(), &pluginstore.FakePluginStore{}, tracer, ruleStore, httpclient.NewProvider(), ngalertfakes.NewFakeReceiverPermissionsService(),
) )
require.NoError(tb, err) require.NoError(tb, err)
return ng, &store.DBstore{ return ng, &store.DBstore{
FeatureToggles: features, FeatureToggles: options.featureToggles,
SQLStore: ng.SQLStore, SQLStore: ng.SQLStore,
Cfg: setting.UnifiedAlertingSettings{ Cfg: setting.UnifiedAlertingSettings{
BaseInterval: baseInterval * time.Second, BaseInterval: baseInterval * time.Second,

View File

@ -143,4 +143,6 @@ func (oss *OSSMigrations) AddMigration(mg *Migrator) {
accesscontrol.AddReceiverCreateScopeMigration(mg) accesscontrol.AddReceiverCreateScopeMigration(mg)
ualert.AddAlertRuleUpdatedByMigration(mg) ualert.AddAlertRuleUpdatedByMigration(mg)
ualert.AddAlertRuleStateTable(mg)
} }

View File

@ -0,0 +1,29 @@
package ualert
import "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
// AddAlertRuleStateTable adds column to store alert rule state data.
func AddAlertRuleStateTable(mg *migrator.Migrator) {
alertStateTable := migrator.Table{
Name: "alert_rule_state",
Columns: []*migrator.Column{
{Name: "id", Type: migrator.DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true},
{Name: "org_id", Type: migrator.DB_BigInt, Nullable: false},
{Name: "rule_uid", Type: migrator.DB_NVarchar, Length: UIDMaxLength, Nullable: false},
{Name: "data", Type: migrator.DB_LongBlob, Nullable: false},
{Name: "updated_at", Type: migrator.DB_DateTime, Nullable: false},
},
Indices: []*migrator.Index{
{Cols: []string{"org_id", "rule_uid"}, Type: migrator.UniqueIndex},
},
}
mg.AddMigration(
"add alert_rule_state table",
migrator.NewAddTableMigration(alertStateTable),
)
mg.AddMigration(
"add index to alert_rule_state on org_id and rule_uid columns",
migrator.NewAddIndexMigration(alertStateTable, alertStateTable.Indices[0]),
)
}

View File

@ -47,11 +47,11 @@ composableKinds: DataQuery: {
// Resource group used in template variable queries // Resource group used in template variable queries
resourceGroup?: string resourceGroup?: string
// Namespace used in template variable queries // Namespace used in template variable queries
namespace?: string namespace?: string
// Resource used in template variable queries // Resource used in template variable queries
resource?: string resource?: string
// Region used in template variable queries // Region used in template variable queries
region?: string region?: string
// Custom namespace used in template variable queries // Custom namespace used in template variable queries
customNamespace?: string customNamespace?: string
// Azure Monitor query type. // Azure Monitor query type.