mirror of https://github.com/grafana/grafana.git
Alerting: Add jitter support for periodic alert state storage to reduce database load spikes (#111357)
What is this feature? This PR implements a jitter mechanism for periodic alert state storage to distribute database load over time instead of processing all alert instances simultaneously. When enabled via the state_periodic_save_jitter_enabled configuration option, the system spreads batch write operations across 85% of the save interval window, preventing database load spikes in high-cardinality alerting environments. Why do we need this feature? In production environments with high alert cardinality, the current periodic batch storage can cause database performance issues by processing all alert instances simultaneously at fixed intervals. Even when using periodic batch storage to improve performance, concentrating all database operations at a single point in time can overwhelm database resources, especially in resource-constrained environments. Rather than performing all INSERT operations at once during the periodic save, distributing these operations across the time window until the next save cycle can maintain more stable service operation within limited database resources. This approach prevents resource saturation by spreading the database load over the available time interval, allowing the system to operate more gracefully within existing resource constraints. For example, with 200,000 alert instances using a 5-minute interval and 4,000 batch size, instead of executing 50 batch operations simultaneously, the jitter mechanism distributes these operations across approximately 4.25 minutes (85% of 5 minutes), with each batch executed roughly every 5.2 seconds. This PR provides system-level protection against such load spikes by distributing operations across time, reducing peak resource usage while maintaining the benefits of periodic batch storage. The jitter mechanism is particularly valuable in resource-constrained environments where maintaining consistent database performance is more critical than precise timing of state updates.
This commit is contained in:
parent
310c83531c
commit
512c292e04
|
|
@ -1453,6 +1453,10 @@ state_periodic_save_interval = 5m
|
|||
# If the feature flag 'alertingSaveStatePeriodic' is enabled, this is the size of the batch that is saved to the database at once.
|
||||
state_periodic_save_batch_size = 1
|
||||
|
||||
# Enable jitter for periodic state saving to distribute database load over time.
|
||||
# When enabled, batches are spread across the save interval to prevent load spikes.
|
||||
state_periodic_save_jitter_enabled = false
|
||||
|
||||
# Disables the smoothing of alert evaluations across their evaluation window.
|
||||
# Rules will evaluate in sync.
|
||||
disable_jitter = false
|
||||
|
|
|
|||
|
|
@ -1426,6 +1426,14 @@
|
|||
# If the feature flag 'alertingSaveStatePeriodic' is enabled, this is the size of the batch that is saved to the database at once.
|
||||
;state_periodic_save_batch_size = 1
|
||||
|
||||
# Enable jitter for periodic state saves to distribute database load over time.
|
||||
# When enabled, batches of alert instances are saved with calculated delays between them,
|
||||
# preventing all instances from being written to the database simultaneously.
|
||||
# This helps reduce database load spikes during periodic saves, especially beneficial
|
||||
# in environments with many alert instances or high database contention.
|
||||
# The jitter delays are distributed within 85% of the save interval to ensure completion before the next cycle.
|
||||
;state_periodic_save_jitter_enabled = false
|
||||
|
||||
# Disables the smoothing of alert evaluations across their evaluation window.
|
||||
# Rules will evaluate in sync.
|
||||
;disable_jitter = false
|
||||
|
|
|
|||
|
|
@ -77,6 +77,35 @@ By default, it saves the states every 5 minutes to the database and on each shut
|
|||
can also be configured using the `state_periodic_save_interval` configuration flag. During this process, Grafana deletes all existing alert instances from the database and then writes the entire current set of instances back in batches in a single transaction.
|
||||
Configure the size of each batch using the `state_periodic_save_batch_size` configuration option.
|
||||
|
||||
#### Jitter for periodic saves
|
||||
|
||||
To further distribute database load, you can enable jitter for periodic state saves by setting `state_periodic_save_jitter_enabled = true`. When jitter is enabled, instead of saving all batches simultaneously, Grafana spreads the batch writes across a calculated time window of 85% of the save interval.
|
||||
|
||||
**How jitter works:**
|
||||
|
||||
- Calculates delays for each batch: `delay = (batchIndex * timeWindow) / (totalBatches - 1)`
|
||||
- Time window uses 85% of save interval for safety margin
|
||||
- Batches are evenly distributed across the time window
|
||||
- All operations occur within a single database transaction
|
||||
|
||||
**Configuration example:**
|
||||
|
||||
```ini
|
||||
[unified_alerting]
|
||||
state_periodic_save_jitter_enabled = true
|
||||
state_periodic_save_interval = 1m
|
||||
state_periodic_save_batch_size = 100
|
||||
```
|
||||
|
||||
**Performance impact:**
|
||||
For 2000 alert instances with 1-minute interval and 100 batch size:
|
||||
|
||||
- Creates 20 batches (2000 ÷ 100)
|
||||
- Spreads writes across 51 seconds (85% of 60s)
|
||||
- Batch writes occur every ~2.68 seconds
|
||||
|
||||
This helps reduce database load spikes in environments with high alert cardinality by distributing writes over time rather than concentrating them at the beginning of each save cycle.
|
||||
|
||||
The time it takes to write to the database periodically can be monitored using the `state_full_sync_duration_seconds` metric
|
||||
that is exposed by Grafana.
|
||||
|
||||
|
|
|
|||
|
|
@ -368,19 +368,21 @@ func (ng *AlertNG) init() error {
|
|||
ng.InstanceStore, ng.StartupInstanceReader = initInstanceStore(ng.store.SQLStore, ng.Log, ng.FeatureToggles)
|
||||
|
||||
stateManagerCfg := state.ManagerCfg{
|
||||
Metrics: ng.Metrics.GetStateMetrics(),
|
||||
ExternalURL: appUrl,
|
||||
DisableExecution: !ng.Cfg.UnifiedAlerting.ExecuteAlerts,
|
||||
InstanceStore: ng.InstanceStore,
|
||||
Images: ng.ImageService,
|
||||
Clock: clk,
|
||||
Historian: history,
|
||||
MaxStateSaveConcurrency: ng.Cfg.UnifiedAlerting.MaxStateSaveConcurrency,
|
||||
StatePeriodicSaveBatchSize: ng.Cfg.UnifiedAlerting.StatePeriodicSaveBatchSize,
|
||||
RulesPerRuleGroupLimit: ng.Cfg.UnifiedAlerting.RulesPerRuleGroupLimit,
|
||||
Tracer: ng.tracer,
|
||||
Log: log.New("ngalert.state.manager"),
|
||||
ResolvedRetention: ng.Cfg.UnifiedAlerting.ResolvedAlertRetention,
|
||||
Metrics: ng.Metrics.GetStateMetrics(),
|
||||
ExternalURL: appUrl,
|
||||
DisableExecution: !ng.Cfg.UnifiedAlerting.ExecuteAlerts,
|
||||
InstanceStore: ng.InstanceStore,
|
||||
Images: ng.ImageService,
|
||||
Clock: clk,
|
||||
Historian: history,
|
||||
MaxStateSaveConcurrency: ng.Cfg.UnifiedAlerting.MaxStateSaveConcurrency,
|
||||
StatePeriodicSaveBatchSize: ng.Cfg.UnifiedAlerting.StatePeriodicSaveBatchSize,
|
||||
StatePeriodicSaveJitterEnabled: ng.Cfg.UnifiedAlerting.StatePeriodicSaveJitterEnabled,
|
||||
StatePeriodicSaveInterval: ng.Cfg.UnifiedAlerting.StatePeriodicSaveInterval,
|
||||
RulesPerRuleGroupLimit: ng.Cfg.UnifiedAlerting.RulesPerRuleGroupLimit,
|
||||
Tracer: ng.tracer,
|
||||
Log: log.New("ngalert.state.manager"),
|
||||
ResolvedRetention: ng.Cfg.UnifiedAlerting.ResolvedAlertRetention,
|
||||
}
|
||||
statePersister := initStatePersister(ng.Cfg.UnifiedAlerting, stateManagerCfg, ng.FeatureToggles)
|
||||
stateManager := state.NewManager(stateManagerCfg, statePersister)
|
||||
|
|
|
|||
|
|
@ -72,6 +72,10 @@ type ManagerCfg struct {
|
|||
// StatePeriodicSaveBatchSize controls the size of the alert instance batch that is saved periodically when the
|
||||
// alertingSaveStatePeriodic feature flag is enabled.
|
||||
StatePeriodicSaveBatchSize int
|
||||
// StatePeriodicSaveInterval controls the interval for periodic state saves.
|
||||
StatePeriodicSaveInterval time.Duration
|
||||
// StatePeriodicSaveJitterEnabled enables jitter for periodic state saves to distribute database load.
|
||||
StatePeriodicSaveJitterEnabled bool
|
||||
|
||||
RulesPerRuleGroupLimit int64
|
||||
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package state
|
|||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
history_model "github.com/grafana/grafana/pkg/services/ngalert/state/historian/model"
|
||||
|
|
@ -25,7 +26,10 @@ type InstanceWriter interface {
|
|||
// SaveAlertInstancesForRule overwrites the state for the given rule.
|
||||
SaveAlertInstancesForRule(ctx context.Context, key models.AlertRuleKeyWithGroup, instances []models.AlertInstance) error
|
||||
DeleteAlertInstancesByRule(ctx context.Context, key models.AlertRuleKeyWithGroup) error
|
||||
FullSync(ctx context.Context, instances []models.AlertInstance, batchSize int) error
|
||||
// FullSync performs a full synchronization of alert instances.
|
||||
// If jitterFunc is provided, applies jitter delays between batches to distribute database load.
|
||||
// If jitterFunc is nil, executes batches without delays.
|
||||
FullSync(ctx context.Context, instances []models.AlertInstance, batchSize int, jitterFunc func(int) time.Duration) error
|
||||
}
|
||||
|
||||
type OrgReader interface {
|
||||
|
|
|
|||
|
|
@ -17,20 +17,24 @@ type AlertInstancesProvider interface {
|
|||
}
|
||||
|
||||
type AsyncStatePersister struct {
|
||||
log log.Logger
|
||||
batchSize int
|
||||
store InstanceStore
|
||||
ticker *clock.Ticker
|
||||
metrics *metrics.State
|
||||
log log.Logger
|
||||
batchSize int
|
||||
store InstanceStore
|
||||
ticker *clock.Ticker
|
||||
metrics *metrics.State
|
||||
jitterEnabled bool
|
||||
interval time.Duration
|
||||
}
|
||||
|
||||
func NewAsyncStatePersister(log log.Logger, ticker *clock.Ticker, cfg ManagerCfg) StatePersister {
|
||||
return &AsyncStatePersister{
|
||||
log: log,
|
||||
store: cfg.InstanceStore,
|
||||
ticker: ticker,
|
||||
batchSize: cfg.StatePeriodicSaveBatchSize,
|
||||
metrics: cfg.Metrics,
|
||||
log: log,
|
||||
store: cfg.InstanceStore,
|
||||
ticker: ticker,
|
||||
batchSize: cfg.StatePeriodicSaveBatchSize,
|
||||
metrics: cfg.Metrics,
|
||||
jitterEnabled: cfg.StatePeriodicSaveJitterEnabled,
|
||||
interval: cfg.StatePeriodicSaveInterval,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -57,7 +61,14 @@ func (a *AsyncStatePersister) fullSync(ctx context.Context, instancesProvider Al
|
|||
startTime := time.Now()
|
||||
a.log.Debug("Full state sync start")
|
||||
instances := instancesProvider.GetAlertInstances()
|
||||
if err := a.store.FullSync(ctx, instances, a.batchSize); err != nil {
|
||||
|
||||
var jitterFunc func(int) time.Duration
|
||||
if a.jitterEnabled {
|
||||
jitterFunc = a.createJitterFunc(instances)
|
||||
}
|
||||
|
||||
err := a.store.FullSync(ctx, instances, a.batchSize, jitterFunc)
|
||||
if err != nil {
|
||||
a.log.Error("Full state sync failed", "duration", time.Since(startTime), "instances", len(instances))
|
||||
return err
|
||||
}
|
||||
|
|
@ -68,6 +79,31 @@ func (a *AsyncStatePersister) fullSync(ctx context.Context, instancesProvider Al
|
|||
return nil
|
||||
}
|
||||
|
||||
func (a *AsyncStatePersister) calculateBatchJitterDelay(batchIndex, totalBatches int, window time.Duration) time.Duration {
|
||||
if totalBatches <= 1 {
|
||||
return 0
|
||||
}
|
||||
|
||||
// Distribute batches evenly across the window
|
||||
ratio := float64(batchIndex) / float64(totalBatches-1)
|
||||
delay := time.Duration(float64(window) * ratio)
|
||||
|
||||
return delay
|
||||
}
|
||||
|
||||
// createJitterFunc creates a jitter function for the given instances
|
||||
func (a *AsyncStatePersister) createJitterFunc(instances []models.AlertInstance) func(int) time.Duration {
|
||||
safetyRatio := 0.85
|
||||
availableWindow := time.Duration(float64(a.interval) * safetyRatio)
|
||||
totalBatches := (len(instances) + a.batchSize - 1) / a.batchSize
|
||||
|
||||
a.log.Debug("Creating jitter function", "instances", len(instances), "batches", totalBatches, "window", availableWindow)
|
||||
|
||||
return func(batchIndex int) time.Duration {
|
||||
return a.calculateBatchJitterDelay(batchIndex, totalBatches, availableWindow)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *AsyncStatePersister) Sync(_ context.Context, _ trace.Span, _ models.AlertRuleKeyWithGroup, _ StateTransitions) {
|
||||
a.log.Debug("Sync: No-Op")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import (
|
|||
"math/rand"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
history_model "github.com/grafana/grafana/pkg/services/ngalert/state/historian/model"
|
||||
|
|
@ -75,7 +76,7 @@ func (f *FakeInstanceStore) DeleteAlertInstancesByRule(ctx context.Context, key
|
|||
return nil
|
||||
}
|
||||
|
||||
func (f *FakeInstanceStore) FullSync(ctx context.Context, instances []models.AlertInstance, batchSize int) error {
|
||||
func (f *FakeInstanceStore) FullSync(ctx context.Context, instances []models.AlertInstance, batchSize int, jitterFunc func(int) time.Duration) error {
|
||||
f.mtx.Lock()
|
||||
defer f.mtx.Unlock()
|
||||
f.recordedOps = []any{}
|
||||
|
|
|
|||
|
|
@ -14,6 +14,42 @@ import (
|
|||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
)
|
||||
|
||||
// jitteredBatch represents a batch of alert instances with associated jitter delay
|
||||
type jitteredBatch struct {
|
||||
index int
|
||||
instances []models.AlertInstance
|
||||
delay time.Duration
|
||||
}
|
||||
|
||||
// createJitteredBatches splits instances into batches and calculates jitter delays
|
||||
func createJitteredBatches(instances []models.AlertInstance, batchSize int, jitterFunc func(int) time.Duration, logger log.Logger) []jitteredBatch {
|
||||
if len(instances) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var batches []jitteredBatch
|
||||
totalInstances := len(instances)
|
||||
|
||||
for start := 0; start < totalInstances; start += batchSize {
|
||||
end := start + batchSize
|
||||
if end > totalInstances {
|
||||
end = totalInstances
|
||||
}
|
||||
|
||||
batchIndex := start / batchSize
|
||||
batch := instances[start:end]
|
||||
delay := jitterFunc(batchIndex)
|
||||
|
||||
batches = append(batches, jitteredBatch{
|
||||
index: batchIndex,
|
||||
instances: batch,
|
||||
delay: delay,
|
||||
})
|
||||
}
|
||||
|
||||
return batches
|
||||
}
|
||||
|
||||
type InstanceDBStore struct {
|
||||
SQLStore db.DB
|
||||
Logger log.Logger
|
||||
|
|
@ -211,7 +247,10 @@ func (st InstanceDBStore) DeleteAlertInstancesByRule(ctx context.Context, key mo
|
|||
//
|
||||
// The batchSize parameter controls how many instances are inserted per batch. Increasing batchSize can improve
|
||||
// performance for large datasets, but can also increase load on the database.
|
||||
func (st InstanceDBStore) FullSync(ctx context.Context, instances []models.AlertInstance, batchSize int) error {
|
||||
//
|
||||
// If jitterFunc is provided, applies jitter delays between batches to distribute database load over time.
|
||||
// If jitterFunc is nil, executes batches without delays for standard behavior.
|
||||
func (st InstanceDBStore) FullSync(ctx context.Context, instances []models.AlertInstance, batchSize int, jitterFunc func(int) time.Duration) error {
|
||||
if len(instances) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
|
@ -220,6 +259,12 @@ func (st InstanceDBStore) FullSync(ctx context.Context, instances []models.Alert
|
|||
batchSize = 1
|
||||
}
|
||||
|
||||
// If jitter is enabled, use the jittered approach
|
||||
if jitterFunc != nil {
|
||||
return st.fullSyncWithJitter(ctx, instances, batchSize, jitterFunc)
|
||||
}
|
||||
|
||||
// Otherwise, use the standard approach without jitter
|
||||
return st.SQLStore.WithTransactionalDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
// First we delete all records from the table
|
||||
if _, err := sess.Exec("DELETE FROM alert_instance"); err != nil {
|
||||
|
|
@ -240,6 +285,64 @@ func (st InstanceDBStore) FullSync(ctx context.Context, instances []models.Alert
|
|||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// fullSyncWithJitter performs a full synchronization with jitter delays between batches.
|
||||
//
|
||||
// This method maintains atomicity by performing all operations within a single transaction,
|
||||
// while distributing the INSERT operations over time to reduce database load spikes.
|
||||
//
|
||||
// The instances parameter should be a flat list of all alert instances.
|
||||
// The jitterFunc should return the delay duration for a given batch index.
|
||||
func (st InstanceDBStore) fullSyncWithJitter(ctx context.Context, instances []models.AlertInstance, batchSize int, jitterFunc func(int) time.Duration) error {
|
||||
if len(instances) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if batchSize <= 0 {
|
||||
batchSize = 1
|
||||
}
|
||||
|
||||
// Prepare all batches and sorting OUTSIDE the transaction
|
||||
batches := createJitteredBatches(instances, batchSize, jitterFunc, st.Logger)
|
||||
|
||||
// Sort batches by delay time (ascending)
|
||||
sort.Slice(batches, func(i, j int) bool {
|
||||
return batches[i].delay < batches[j].delay
|
||||
})
|
||||
|
||||
// Execute the optimized transaction with pre-calculated batches
|
||||
return st.executeJitteredBatchesInTransaction(ctx, batches)
|
||||
}
|
||||
|
||||
// executeJitteredBatchesInTransaction executes pre-calculated batches within a single transaction
|
||||
// with jitter delays. All preparation work should be done before calling this method.
|
||||
func (st InstanceDBStore) executeJitteredBatchesInTransaction(ctx context.Context, batches []jitteredBatch) error {
|
||||
return st.SQLStore.WithTransactionalDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
// Capture the actual transaction start time for accurate delay calculations
|
||||
transactionStartTime := time.Now()
|
||||
|
||||
// First we delete all records from the table
|
||||
if _, err := sess.Exec("DELETE FROM alert_instance"); err != nil {
|
||||
return fmt.Errorf("failed to delete alert_instance table: %w", err)
|
||||
}
|
||||
|
||||
// Execute batches in order with absolute time-based delays using transaction start time
|
||||
for _, batch := range batches {
|
||||
// Calculate target time and wait until then
|
||||
targetTime := transactionStartTime.Add(batch.delay)
|
||||
if sleepDuration := time.Until(targetTime); sleepDuration > 0 {
|
||||
time.Sleep(sleepDuration)
|
||||
}
|
||||
|
||||
// Insert this batch
|
||||
if err := st.insertInstancesBatch(sess, batch.instances); err != nil {
|
||||
return fmt.Errorf("failed to insert batch %d [%d instances]: %w", batch.index, len(batch.instances), err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := sess.Commit(); err != nil {
|
||||
return fmt.Errorf("failed to commit alert_instance table: %w", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/golang/snappy"
|
||||
"github.com/grafana/grafana/pkg/util/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
|
|
@ -17,7 +18,6 @@ import (
|
|||
pb "github.com/grafana/grafana/pkg/services/ngalert/store/proto/v1"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/tests"
|
||||
"github.com/grafana/grafana/pkg/util"
|
||||
"github.com/grafana/grafana/pkg/util/testutil"
|
||||
)
|
||||
|
||||
const baseIntervalSeconds = 10
|
||||
|
|
@ -316,7 +316,7 @@ func TestIntegrationFullSync(t *testing.T) {
|
|||
}
|
||||
|
||||
t.Run("Should do a proper full sync", func(t *testing.T) {
|
||||
err := ng.InstanceStore.FullSync(ctx, instances, batchSize)
|
||||
err := ng.InstanceStore.FullSync(ctx, instances, batchSize, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
|
||||
|
|
@ -339,7 +339,7 @@ func TestIntegrationFullSync(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Should remove non existing entries on sync", func(t *testing.T) {
|
||||
err := ng.InstanceStore.FullSync(ctx, instances[1:], batchSize)
|
||||
err := ng.InstanceStore.FullSync(ctx, instances[1:], batchSize, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
|
||||
|
|
@ -356,7 +356,7 @@ func TestIntegrationFullSync(t *testing.T) {
|
|||
|
||||
t.Run("Should add new entries on sync", func(t *testing.T) {
|
||||
newRuleUID := "y"
|
||||
err := ng.InstanceStore.FullSync(ctx, append(instances, generateTestAlertInstance(orgID, newRuleUID)), batchSize)
|
||||
err := ng.InstanceStore.FullSync(ctx, append(instances, generateTestAlertInstance(orgID, newRuleUID)), batchSize, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
|
||||
|
|
@ -381,7 +381,7 @@ func TestIntegrationFullSync(t *testing.T) {
|
|||
t.Run("Should save all instances when batch size is bigger than 1", func(t *testing.T) {
|
||||
batchSize = 2
|
||||
newRuleUID := "y"
|
||||
err := ng.InstanceStore.FullSync(ctx, append(instances, generateTestAlertInstance(orgID, newRuleUID)), batchSize)
|
||||
err := ng.InstanceStore.FullSync(ctx, append(instances, generateTestAlertInstance(orgID, newRuleUID)), batchSize, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
|
||||
|
|
@ -409,12 +409,12 @@ func TestIntegrationFullSync(t *testing.T) {
|
|||
generateTestAlertInstance(orgID, "preexisting-1"),
|
||||
generateTestAlertInstance(orgID, "preexisting-2"),
|
||||
}
|
||||
err := ng.InstanceStore.FullSync(ctx, initialInstances, 5)
|
||||
err := ng.InstanceStore.FullSync(ctx, initialInstances, 5, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Now call FullSync with no instances. According to the code, this should return nil
|
||||
// and should not delete anything in the table.
|
||||
err = ng.InstanceStore.FullSync(ctx, []models.AlertInstance{}, 5)
|
||||
err = ng.InstanceStore.FullSync(ctx, []models.AlertInstance{}, 5, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check that the previously inserted instances are still present.
|
||||
|
|
@ -445,7 +445,7 @@ func TestIntegrationFullSync(t *testing.T) {
|
|||
// Make the invalid instance actually invalid
|
||||
invalidInstance.RuleUID = ""
|
||||
|
||||
err := ng.InstanceStore.FullSync(ctx, []models.AlertInstance{validInstance, invalidInstance}, 2)
|
||||
err := ng.InstanceStore.FullSync(ctx, []models.AlertInstance{validInstance, invalidInstance}, 2, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Only the valid instance should be saved.
|
||||
|
|
@ -464,7 +464,7 @@ func TestIntegrationFullSync(t *testing.T) {
|
|||
generateTestAlertInstance(orgID, "batch-test2"),
|
||||
}
|
||||
|
||||
err := ng.InstanceStore.FullSync(ctx, smallSet, 100)
|
||||
err := ng.InstanceStore.FullSync(ctx, smallSet, 100, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
|
||||
|
|
@ -487,7 +487,7 @@ func TestIntegrationFullSync(t *testing.T) {
|
|||
|
||||
t.Run("Should handle a large set of instances with a moderate batchSize", func(t *testing.T) {
|
||||
// Clear everything first.
|
||||
err := ng.InstanceStore.FullSync(ctx, []models.AlertInstance{}, 1)
|
||||
err := ng.InstanceStore.FullSync(ctx, []models.AlertInstance{}, 1, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
largeCount := 300
|
||||
|
|
@ -496,7 +496,7 @@ func TestIntegrationFullSync(t *testing.T) {
|
|||
largeSet[i] = generateTestAlertInstance(orgID, fmt.Sprintf("large-%d", i))
|
||||
}
|
||||
|
||||
err = ng.InstanceStore.FullSync(ctx, largeSet, 50)
|
||||
err = ng.InstanceStore.FullSync(ctx, largeSet, 50, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
|
||||
|
|
@ -507,6 +507,135 @@ func TestIntegrationFullSync(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestIntegrationFullSyncWithJitter(t *testing.T) {
|
||||
testutil.SkipIntegrationTestInShortMode(t)
|
||||
|
||||
batchSize := 2
|
||||
|
||||
ctx := context.Background()
|
||||
ng, _ := tests.SetupTestEnv(t, baseIntervalSeconds)
|
||||
|
||||
orgID := int64(1)
|
||||
ruleUIDs := []string{"j1", "j2", "j3", "j4", "j5"}
|
||||
|
||||
instances := make([]models.AlertInstance, len(ruleUIDs))
|
||||
for i, ruleUID := range ruleUIDs {
|
||||
instances[i] = generateTestAlertInstance(orgID, ruleUID)
|
||||
}
|
||||
|
||||
// Simple jitter function for testing
|
||||
jitterFunc := func(batchIndex int) time.Duration {
|
||||
return time.Duration(batchIndex*100) * time.Millisecond
|
||||
}
|
||||
|
||||
t.Run("Should do a proper full sync with jitter", func(t *testing.T) {
|
||||
err := ng.InstanceStore.FullSync(ctx, instances, batchSize, jitterFunc)
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
|
||||
RuleOrgID: orgID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res, len(instances))
|
||||
|
||||
// Verify all instances were saved
|
||||
for _, ruleUID := range ruleUIDs {
|
||||
found := false
|
||||
for _, instance := range res {
|
||||
if instance.RuleUID == ruleUID {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
require.True(t, found, "Instance with RuleUID '%s' not found", ruleUID)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Should handle empty instances with jitter", func(t *testing.T) {
|
||||
err := ng.InstanceStore.FullSync(ctx, []models.AlertInstance{}, batchSize, jitterFunc)
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
|
||||
RuleOrgID: orgID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res, len(instances), "Empty sync should not delete existing instances")
|
||||
})
|
||||
|
||||
t.Run("Should handle zero delays (immediate execution)", func(t *testing.T) {
|
||||
testInstances := make([]models.AlertInstance, 2)
|
||||
for i := 0; i < 2; i++ {
|
||||
testInstances[i] = generateTestAlertInstance(orgID, fmt.Sprintf("immediate-%d", i))
|
||||
}
|
||||
|
||||
// Function that returns zero delays
|
||||
immediateJitterFunc := func(batchIndex int) time.Duration {
|
||||
return 0 * time.Second
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
err := ng.InstanceStore.FullSync(ctx, testInstances, 1, immediateJitterFunc)
|
||||
elapsed := time.Since(start)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Should complete quickly since all delays are zero
|
||||
require.Less(t, elapsed, 500*time.Millisecond, "Zero delays should execute immediately")
|
||||
|
||||
// Verify data was saved
|
||||
res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
|
||||
RuleOrgID: orgID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res, 2)
|
||||
})
|
||||
|
||||
t.Run("Should execute jitter delays correctly and save data", func(t *testing.T) {
|
||||
testInstances := make([]models.AlertInstance, 4)
|
||||
for i := 0; i < 4; i++ {
|
||||
testInstances[i] = generateTestAlertInstance(orgID, fmt.Sprintf("jitter-test-%d", i))
|
||||
}
|
||||
|
||||
// Track jitter function calls
|
||||
jitterCalls := []int{}
|
||||
realJitterFunc := func(batchIndex int) time.Duration {
|
||||
jitterCalls = append(jitterCalls, batchIndex)
|
||||
return time.Duration(batchIndex*200) * time.Millisecond // 0ms, 200ms delays
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
err := ng.InstanceStore.FullSync(ctx, testInstances, 2, realJitterFunc)
|
||||
elapsed := time.Since(start)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Should take at least the maximum delay (200ms for batch 1)
|
||||
require.GreaterOrEqual(t, elapsed, 200*time.Millisecond, "Should wait for jitter delays")
|
||||
require.Less(t, elapsed, 1*time.Second, "Should not take too long")
|
||||
|
||||
// Verify jitter function was called for each batch
|
||||
require.Equal(t, []int{0, 1}, jitterCalls, "Should call jitter function for each batch")
|
||||
|
||||
// Verify all data was saved correctly
|
||||
res, err := ng.InstanceStore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
|
||||
RuleOrgID: orgID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res, 4)
|
||||
|
||||
// Verify specific instances were saved
|
||||
for i := 0; i < 4; i++ {
|
||||
expectedUID := fmt.Sprintf("jitter-test-%d", i)
|
||||
found := false
|
||||
for _, instance := range res {
|
||||
if instance.RuleUID == expectedUID {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
require.True(t, found, "Instance with RuleUID '%s' not found", expectedUID)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestIntegration_ProtoInstanceDBStore_VerifyCompressedData(t *testing.T) {
|
||||
testutil.SkipIntegrationTestInShortMode(t)
|
||||
|
||||
|
|
|
|||
|
|
@ -133,7 +133,7 @@ func (st ProtoInstanceDBStore) DeleteAlertInstancesByRule(ctx context.Context, k
|
|||
})
|
||||
}
|
||||
|
||||
func (st ProtoInstanceDBStore) FullSync(ctx context.Context, instances []models.AlertInstance, batchSize int) error {
|
||||
func (st ProtoInstanceDBStore) FullSync(ctx context.Context, instances []models.AlertInstance, batchSize int, jitterFunc func(int) time.Duration) error {
|
||||
logger := st.Logger.FromContext(ctx)
|
||||
logger.Error("FullSync called and not implemented")
|
||||
return errors.New("fullsync is not implemented for proto instance database store")
|
||||
|
|
|
|||
|
|
@ -134,10 +134,11 @@ type UnifiedAlertingSettings struct {
|
|||
PrometheusConversion UnifiedAlertingPrometheusConversionSettings
|
||||
|
||||
// MaxStateSaveConcurrency controls the number of goroutines (per rule) that can save alert state in parallel.
|
||||
MaxStateSaveConcurrency int
|
||||
StatePeriodicSaveInterval time.Duration
|
||||
StatePeriodicSaveBatchSize int
|
||||
RulesPerRuleGroupLimit int64
|
||||
MaxStateSaveConcurrency int
|
||||
StatePeriodicSaveInterval time.Duration
|
||||
StatePeriodicSaveBatchSize int
|
||||
StatePeriodicSaveJitterEnabled bool
|
||||
RulesPerRuleGroupLimit int64
|
||||
|
||||
// Retention period for Alertmanager notification log entries.
|
||||
NotificationLogRetention time.Duration
|
||||
|
|
@ -560,6 +561,8 @@ func (cfg *Cfg) ReadUnifiedAlertingSettings(iniFile *ini.File) error {
|
|||
|
||||
uaCfg.StatePeriodicSaveBatchSize = ua.Key("state_periodic_save_batch_size").MustInt(1)
|
||||
|
||||
uaCfg.StatePeriodicSaveJitterEnabled = ua.Key("state_periodic_save_jitter_enabled").MustBool(false)
|
||||
|
||||
uaCfg.NotificationLogRetention, err = gtime.ParseDuration(valueAsString(ua, "notification_log_retention", (5 * 24 * time.Hour).String()))
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
|||
Loading…
Reference in New Issue