diff --git a/conf/defaults.ini b/conf/defaults.ini index e01a8df0c63..f037ec0de10 100644 --- a/conf/defaults.ini +++ b/conf/defaults.ini @@ -1180,6 +1180,10 @@ min_interval = 10s # (concurrent queries per rule disabled). max_state_save_concurrency = 1 +# If the feature flag 'alertingSaveStatePeriodic' is enabled, this is the interval that is used to persist the alerting instances to the database. +# The interval string is a possibly signed sequence of decimal numbers, followed by a unit suffix (ms, s, m, h, d), e.g. 30s or 1m. +state_periodic_save_interval = 5m + [unified_alerting.screenshots] # Enable screenshots in notifications. You must have either installed the Grafana image rendering # plugin, or set up Grafana to use a remote rendering service. diff --git a/conf/sample.ini b/conf/sample.ini index 02d62ec86da..ec1bb84f03e 100644 --- a/conf/sample.ini +++ b/conf/sample.ini @@ -1112,6 +1112,15 @@ # The interval string is a possibly signed sequence of decimal numbers, followed by a unit suffix (ms, s, m, h, d), e.g. 30s or 1m. ;min_interval = 10s +# This is an experimental option to add parallelization to saving alert states in the database. +# It configures the maximum number of concurrent queries per rule evaluated. The default value is 1 +# (concurrent queries per rule disabled). +;max_state_save_concurrency = 1 + +# If the feature flag 'alertingSaveStatePeriodic' is enabled, this is the interval that is used to persist the alerting instances to the database. +# The interval string is a possibly signed sequence of decimal numbers, followed by a unit suffix (ms, s, m, h, d), e.g. 30s or 1m. +;state_periodic_save_interval = 5m + [unified_alerting.reserved_labels] # Comma-separated list of reserved labels added by the Grafana Alerting engine that should be disabled. # For example: `disabled_labels=grafana_folder` diff --git a/packages/grafana-data/src/types/featureToggles.gen.ts b/packages/grafana-data/src/types/featureToggles.gen.ts index 69881317e3f..9493f21bf45 100644 --- a/packages/grafana-data/src/types/featureToggles.gen.ts +++ b/packages/grafana-data/src/types/featureToggles.gen.ts @@ -177,4 +177,5 @@ export interface FeatureToggles { jitterAlertRules?: boolean; jitterAlertRulesWithinGroups?: boolean; onPremToCloudMigrations?: boolean; + alertingSaveStatePeriodic?: boolean; } diff --git a/pkg/services/featuremgmt/registry.go b/pkg/services/featuremgmt/registry.go index 9b29e18147b..022e4d5eac7 100644 --- a/pkg/services/featuremgmt/registry.go +++ b/pkg/services/featuremgmt/registry.go @@ -1354,5 +1354,13 @@ var ( Owner: grafanaOperatorExperienceSquad, Created: time.Date(2024, time.January, 22, 3, 30, 00, 00, time.UTC), }, + { + Name: "alertingSaveStatePeriodic", + Description: "Writes the state periodically to the database, asynchronous to rule evaluation", + Stage: FeatureStagePrivatePreview, + FrontendOnly: false, + Owner: grafanaAlertingSquad, + Created: time.Date(2024, time.January, 22, 12, 0, 0, 0, time.UTC), + }, } ) diff --git a/pkg/services/featuremgmt/toggles_gen.csv b/pkg/services/featuremgmt/toggles_gen.csv index e3ec2dc65d7..7a4fd21e8b5 100644 --- a/pkg/services/featuremgmt/toggles_gen.csv +++ b/pkg/services/featuremgmt/toggles_gen.csv @@ -158,3 +158,4 @@ newFolderPicker,experimental,@grafana/grafana-frontend-platform,2024-01-12,false jitterAlertRules,experimental,@grafana/alerting-squad,2024-01-17,false,false,true,false jitterAlertRulesWithinGroups,experimental,@grafana/alerting-squad,2024-01-17,false,false,true,false onPremToCloudMigrations,experimental,@grafana/grafana-operator-experience-squad,2024-01-22,false,false,false,false +alertingSaveStatePeriodic,privatePreview,@grafana/alerting-squad,2024-01-22,false,false,false,false diff --git a/pkg/services/featuremgmt/toggles_gen.go b/pkg/services/featuremgmt/toggles_gen.go index 42e26adbc84..99ed5276759 100644 --- a/pkg/services/featuremgmt/toggles_gen.go +++ b/pkg/services/featuremgmt/toggles_gen.go @@ -642,4 +642,8 @@ const ( // FlagOnPremToCloudMigrations // In-development feature that will allow users to easily migrate their on-prem Grafana instances to Grafana Cloud. FlagOnPremToCloudMigrations = "onPremToCloudMigrations" + + // FlagAlertingSaveStatePeriodic + // Writes the state periodically to the database, asynchronous to rule evaluation + FlagAlertingSaveStatePeriodic = "alertingSaveStatePeriodic" ) diff --git a/pkg/services/ngalert/metrics/state.go b/pkg/services/ngalert/metrics/state.go index 83b87f72d05..92093ad6efa 100644 --- a/pkg/services/ngalert/metrics/state.go +++ b/pkg/services/ngalert/metrics/state.go @@ -6,8 +6,9 @@ import ( ) type State struct { - StateUpdateDuration prometheus.Histogram - r prometheus.Registerer + StateUpdateDuration prometheus.Histogram + StateFullSyncDuration prometheus.Histogram + r prometheus.Registerer } // Registerer exposes the Prometheus register directly. The state package needs this as, it uses a collector to fetch the current alerts by state in the system. @@ -27,5 +28,14 @@ func NewStateMetrics(r prometheus.Registerer) *State { Buckets: []float64{0.01, 0.1, 1, 2, 5, 10}, }, ), + StateFullSyncDuration: promauto.With(r).NewHistogram( + prometheus.HistogramOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "state_full_sync_duration_seconds", + Help: "The duration of fully synchronizing the state with the database.", + Buckets: []float64{0.01, 0.1, 1, 2, 5, 10, 60}, + }, + ), } } diff --git a/pkg/services/ngalert/ngalert.go b/pkg/services/ngalert/ngalert.go index 09e8412ff29..e4e610400ab 100644 --- a/pkg/services/ngalert/ngalert.go +++ b/pkg/services/ngalert/ngalert.go @@ -296,7 +296,12 @@ func (ng *AlertNG) init() error { Tracer: ng.tracer, Log: log.New("ngalert.state.manager"), } - statePersister := state.NewSyncStatePersisiter(log.New("ngalert.state.manager.persist"), cfg) + logger := log.New("ngalert.state.manager.persist") + statePersister := state.NewSyncStatePersisiter(logger, cfg) + if ng.FeatureToggles.IsEnabledGlobally(featuremgmt.FlagAlertingSaveStatePeriodic) { + ticker := clock.New().Ticker(ng.Cfg.UnifiedAlerting.StatePeriodicSaveInterval) + statePersister = state.NewAsyncStatePersister(logger, ticker, cfg) + } stateManager := state.NewManager(cfg, statePersister) scheduler := schedule.NewScheduler(schedCfg, stateManager) @@ -423,6 +428,9 @@ func (ng *AlertNG) Run(ctx context.Context) error { children.Go(func() error { return ng.schedule.Run(subCtx) }) + children.Go(func() error { + return ng.stateManager.Run(subCtx) + }) } return children.Wait() } diff --git a/pkg/services/ngalert/state/manager.go b/pkg/services/ngalert/state/manager.go index bef12d6120e..2f8eb2cf1bc 100644 --- a/pkg/services/ngalert/state/manager.go +++ b/pkg/services/ngalert/state/manager.go @@ -30,7 +30,7 @@ type AlertInstanceManager interface { } type StatePersister interface { - Async(ctx context.Context, ticker *clock.Ticker, cache *cache) + Async(ctx context.Context, cache *cache) Sync(ctx context.Context, span trace.Span, states, staleStates []StateTransition) } @@ -103,6 +103,11 @@ func NewManager(cfg ManagerCfg, statePersister StatePersister) *Manager { return m } +func (st *Manager) Run(ctx context.Context) error { + st.persister.Async(ctx, st.cache) + return nil +} + func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader) { if st.instanceStore == nil { st.log.Info("Skip warming the state because instance store is not configured") diff --git a/pkg/services/ngalert/state/persister_async.go b/pkg/services/ngalert/state/persister_async.go index bcd86ab5a72..a0118b92df8 100644 --- a/pkg/services/ngalert/state/persister_async.go +++ b/pkg/services/ngalert/state/persister_async.go @@ -8,6 +8,7 @@ import ( "go.opentelemetry.io/otel/trace" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/services/ngalert/metrics" ) type AsyncStatePersister struct { @@ -15,20 +16,24 @@ type AsyncStatePersister struct { // doNotSaveNormalState controls whether eval.Normal state is persisted to the database and returned by get methods. doNotSaveNormalState bool store InstanceStore + ticker *clock.Ticker + metrics *metrics.State } -func NewAsyncStatePersister(log log.Logger, cfg ManagerCfg) StatePersister { +func NewAsyncStatePersister(log log.Logger, ticker *clock.Ticker, cfg ManagerCfg) StatePersister { return &AsyncStatePersister{ log: log, store: cfg.InstanceStore, + ticker: ticker, doNotSaveNormalState: cfg.DoNotSaveNormalState, + metrics: cfg.Metrics, } } -func (a *AsyncStatePersister) Async(ctx context.Context, ticker *clock.Ticker, cache *cache) { +func (a *AsyncStatePersister) Async(ctx context.Context, cache *cache) { for { select { - case <-ticker.C: + case <-a.ticker.C: if err := a.fullSync(ctx, cache); err != nil { a.log.Error("Failed to do a full state sync to database", "err", err) } @@ -37,7 +42,7 @@ func (a *AsyncStatePersister) Async(ctx context.Context, ticker *clock.Ticker, c if err := a.fullSync(context.Background(), cache); err != nil { a.log.Error("Failed to do a full state sync to database", "err", err) } - ticker.Stop() + a.ticker.Stop() a.log.Info("State async worker is shut down.") return } @@ -46,13 +51,16 @@ func (a *AsyncStatePersister) Async(ctx context.Context, ticker *clock.Ticker, c func (a *AsyncStatePersister) fullSync(ctx context.Context, cache *cache) error { startTime := time.Now() - a.log.Info("Full state sync start") + a.log.Debug("Full state sync start") instances := cache.asInstances(a.doNotSaveNormalState) if err := a.store.FullSync(ctx, instances); err != nil { a.log.Error("Full state sync failed", "duration", time.Since(startTime), "instances", len(instances)) return err } - a.log.Info("Full state sync done", "duration", time.Since(startTime), "instances", len(instances)) + a.log.Debug("Full state sync done", "duration", time.Since(startTime), "instances", len(instances)) + if a.metrics != nil { + a.metrics.StateFullSyncDuration.Observe(time.Since(startTime).Seconds()) + } return nil } diff --git a/pkg/services/ngalert/state/persister_async_test.go b/pkg/services/ngalert/state/persister_async_test.go index aba2e5ccba4..5358510afe2 100644 --- a/pkg/services/ngalert/state/persister_async_test.go +++ b/pkg/services/ngalert/state/persister_async_test.go @@ -18,7 +18,7 @@ func TestAsyncStatePersister_Async(t *testing.T) { store := &FakeInstanceStore{} logger := log.New("async.test") - persister := NewAsyncStatePersister(logger, ManagerCfg{ + persister := NewAsyncStatePersister(logger, mockClock.Ticker(1*time.Second), ManagerCfg{ InstanceStore: store, }) @@ -28,11 +28,9 @@ func TestAsyncStatePersister_Async(t *testing.T) { cancel() }() - ticker := mockClock.Ticker(1 * time.Second) - cache := newCache() - go persister.Async(ctx, ticker, cache) + go persister.Async(ctx, cache) cache.set(&State{ OrgID: 1, @@ -52,17 +50,15 @@ func TestAsyncStatePersister_Async(t *testing.T) { store := &FakeInstanceStore{} logger := log.New("async.test") - persister := NewAsyncStatePersister(logger, ManagerCfg{ + persister := NewAsyncStatePersister(logger, mockClock.Ticker(1*time.Second), ManagerCfg{ InstanceStore: store, }) ctx, cancel := context.WithCancel(context.Background()) - ticker := mockClock.Ticker(1 * time.Second) - cache := newCache() - go persister.Async(ctx, ticker, cache) + go persister.Async(ctx, cache) cache.set(&State{ OrgID: 1, diff --git a/pkg/services/ngalert/state/persister_noop.go b/pkg/services/ngalert/state/persister_noop.go index c1abd5dc1d6..a500f34eead 100644 --- a/pkg/services/ngalert/state/persister_noop.go +++ b/pkg/services/ngalert/state/persister_noop.go @@ -3,13 +3,12 @@ package state import ( "context" - "github.com/benbjohnson/clock" "go.opentelemetry.io/otel/trace" ) type NoopPersister struct{} -func (n *NoopPersister) Async(_ context.Context, _ *clock.Ticker, _ *cache) {} +func (n *NoopPersister) Async(_ context.Context, _ *cache) {} func (n *NoopPersister) Sync(_ context.Context, _ trace.Span, _, _ []StateTransition) {} func NewNoopPersister() StatePersister { diff --git a/pkg/services/ngalert/state/persister_sync.go b/pkg/services/ngalert/state/persister_sync.go index 5d28922b213..bc8c5cf1673 100644 --- a/pkg/services/ngalert/state/persister_sync.go +++ b/pkg/services/ngalert/state/persister_sync.go @@ -4,7 +4,6 @@ import ( "context" "time" - "github.com/benbjohnson/clock" "github.com/grafana/dskit/concurrency" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -31,7 +30,7 @@ func NewSyncStatePersisiter(log log.Logger, cfg ManagerCfg) StatePersister { } } -func (a *SyncStatePersister) Async(_ context.Context, _ *clock.Ticker, _ *cache) { +func (a *SyncStatePersister) Async(_ context.Context, _ *cache) { a.log.Debug("Async: No-Op") } func (a *SyncStatePersister) Sync(ctx context.Context, span trace.Span, states, staleStates []StateTransition) { diff --git a/pkg/setting/setting_unified_alerting.go b/pkg/setting/setting_unified_alerting.go index 3657430c89d..ca27fb228ca 100644 --- a/pkg/setting/setting_unified_alerting.go +++ b/pkg/setting/setting_unified_alerting.go @@ -98,7 +98,8 @@ type UnifiedAlertingSettings struct { RemoteAlertmanager RemoteAlertmanagerSettings Upgrade UnifiedAlertingUpgradeSettings // MaxStateSaveConcurrency controls the number of goroutines (per rule) that can save alert state in parallel. - MaxStateSaveConcurrency int + MaxStateSaveConcurrency int + StatePeriodicSaveInterval time.Duration } // RemoteAlertmanagerSettings contains the configuration needed @@ -403,6 +404,11 @@ func (cfg *Cfg) ReadUnifiedAlertingSettings(iniFile *ini.File) error { uaCfg.MaxStateSaveConcurrency = ua.Key("max_state_save_concurrency").MustInt(1) + uaCfg.StatePeriodicSaveInterval, err = gtime.ParseDuration(valueAsString(ua, "state_periodic_save_interval", (time.Minute * 5).String())) + if err != nil { + return err + } + upgrade := iniFile.Section("unified_alerting.upgrade") uaCfgUpgrade := UnifiedAlertingUpgradeSettings{ CleanUpgrade: upgrade.Key("clean_upgrade").MustBool(false),