Alerting: Stop firing of alert when it is updated (#39975)

* Update API to call the scheduler to remove\update an alert rule. When a rule is updated by a user, the scheduler will remove the currently firing alert instances and clean up the state cache. 
* Update evaluation loop in the scheduler to support one more channel that is used to communicate updates to it.
* Improved rule deletion from the internal registry. 
* Move alert rule version from the internal registry (structure alertRuleInfo) closer rule evaluation loop (to evaluation task structure), which will make the registry values immutable.
* Extract notification code to a separate function to reuse in update flow.
This commit is contained in:
Yuriy Tseretyan 2022-01-11 11:39:34 -05:00 committed by GitHub
parent f6414ea2b2
commit ed5c664e4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 526 additions and 69 deletions

View File

@ -95,7 +95,7 @@ func (api *API) RegisterAPIEndpoints(m *metrics.API) {
api.RegisterRulerApiEndpoints(NewForkedRuler(
api.DatasourceCache,
NewLotexRuler(proxy, logger),
RulerSrv{DatasourceCache: api.DatasourceCache, QuotaService: api.QuotaService, manager: api.StateManager, store: api.RuleStore, log: logger},
RulerSrv{DatasourceCache: api.DatasourceCache, QuotaService: api.QuotaService, scheduleService: api.Schedule, store: api.RuleStore, log: logger},
), m)
api.RegisterTestingApiEndpoints(NewForkedTestingApi(
TestingApiSrv{

View File

@ -6,26 +6,28 @@ import (
"net/http"
"time"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/ngalert/store"
"github.com/grafana/grafana/pkg/services/quota"
"github.com/prometheus/common/model"
"github.com/grafana/grafana/pkg/api/apierrors"
"github.com/grafana/grafana/pkg/api/response"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/datasources"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/services/ngalert/store"
"github.com/grafana/grafana/pkg/services/quota"
"github.com/grafana/grafana/pkg/services/ngalert/schedule"
"github.com/grafana/grafana/pkg/util"
"github.com/grafana/grafana/pkg/web"
"github.com/prometheus/common/model"
)
type RulerSrv struct {
store store.RuleStore
DatasourceCache datasources.CacheService
QuotaService *quota.QuotaService
manager *state.Manager
scheduleService schedule.ScheduleService
log log.Logger
}
@ -42,7 +44,10 @@ func (srv RulerSrv) RouteDeleteNamespaceRulesConfig(c *models.ReqContext) respon
}
for _, uid := range uids {
srv.manager.RemoveByRuleUID(c.SignedInUser.OrgId, uid)
srv.scheduleService.DeleteAlertRule(ngmodels.AlertRuleKey{
OrgID: c.SignedInUser.OrgId,
UID: uid,
})
}
return response.JSON(http.StatusAccepted, util.DynMap{"message": "namespace rules deleted"})
@ -65,7 +70,10 @@ func (srv RulerSrv) RouteDeleteRuleGroupConfig(c *models.ReqContext) response.Re
}
for _, uid := range uids {
srv.manager.RemoveByRuleUID(c.SignedInUser.OrgId, uid)
srv.scheduleService.DeleteAlertRule(ngmodels.AlertRuleKey{
OrgID: c.SignedInUser.OrgId,
UID: uid,
})
}
return response.JSON(http.StatusAccepted, util.DynMap{"message": "rule group deleted"})
@ -288,7 +296,10 @@ func (srv RulerSrv) RoutePostNameRulesConfig(c *models.ReqContext, ruleGroupConf
}
for uid := range alertRuleUIDs {
srv.manager.RemoveByRuleUID(c.OrgId, uid)
srv.scheduleService.UpdateAlertRule(ngmodels.AlertRuleKey{
OrgID: c.SignedInUser.OrgId,
UID: uid,
})
}
return response.JSON(http.StatusAccepted, util.DynMap{"message": "rule group updated successfully"})

View File

@ -6,6 +6,7 @@ import (
"path"
"time"
"github.com/benbjohnson/clock"
"github.com/go-openapi/strfmt"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/prometheus/alertmanager/api/v2/models"
@ -125,3 +126,19 @@ func FromAlertStateToPostableAlerts(firingStates []*state.State, stateManager *s
stateManager.Put(sentAlerts)
return alerts
}
// FromAlertsStateToStoppedAlert converts firingStates that have evaluation state either eval.Alerting or eval.NoData or eval.Error to models.PostableAlert that are accepted by notifiers.
// Returns a list of alert instances that have expiration time.Now
func FromAlertsStateToStoppedAlert(firingStates []*state.State, appURL *url.URL, clock clock.Clock) apimodels.PostableAlerts {
alerts := apimodels.PostableAlerts{PostableAlerts: make([]models.PostableAlert, 0, len(firingStates))}
ts := clock.Now()
for _, alertState := range firingStates {
if alertState.State == eval.Normal || alertState.State == eval.Pending {
continue
}
postableAlert := stateToPostableAlert(alertState, appURL)
postableAlert.EndsAt = strfmt.DateTime(ts)
alerts.PostableAlerts = append(alerts.PostableAlerts, *postableAlert)
}
return alerts
}

View File

@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/benbjohnson/clock"
"github.com/go-openapi/strfmt"
"github.com/prometheus/alertmanager/api/v2/models"
"github.com/prometheus/common/model"
@ -193,6 +194,37 @@ func Test_stateToPostableAlert(t *testing.T) {
}
}
func Test_FromAlertsStateToStoppedAlert(t *testing.T) {
appURL := &url.URL{
Scheme: "http:",
Host: fmt.Sprintf("host-%d", rand.Int()),
Path: fmt.Sprintf("path-%d", rand.Int()),
}
evalStates := [...]eval.State{eval.Normal, eval.Alerting, eval.Pending, eval.Error, eval.NoData}
states := make([]*state.State, 0, len(evalStates))
for _, s := range evalStates {
states = append(states, randomState(s))
}
clk := clock.NewMock()
clk.Set(time.Now())
expected := make([]models.PostableAlert, 0, len(states))
for _, s := range states {
if !(s.State == eval.Alerting || s.State == eval.Error || s.State == eval.NoData) {
continue
}
alert := stateToPostableAlert(s, appURL)
alert.EndsAt = strfmt.DateTime(clk.Now())
expected = append(expected, *alert)
}
result := FromAlertsStateToStoppedAlert(states, appURL, clk)
require.Equal(t, expected, result.PostableAlerts)
}
func randomMapOfStrings() map[string]string {
max := 5
result := make(map[string]string, max)

View File

@ -11,6 +11,7 @@ import (
"github.com/grafana/grafana/pkg/expr"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/alerting"
"github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
"github.com/grafana/grafana/pkg/services/ngalert/models"
@ -39,7 +40,10 @@ type ScheduleService interface {
// DroppedAlertmanagersFor returns all the dropped Alertmanager URLs for the
// organization.
DroppedAlertmanagersFor(orgID int64) []*url.URL
// UpdateAlertRule notifies scheduler that a rule has been changed
UpdateAlertRule(key models.AlertRuleKey)
// DeleteAlertRule notifies scheduler that a rule has been changed
DeleteAlertRule(key models.AlertRuleKey)
// the following are used by tests only used for tests
evalApplied(models.AlertRuleKey, time.Time)
stopApplied(models.AlertRuleKey)
@ -307,6 +311,26 @@ func (sch *schedule) DroppedAlertmanagersFor(orgID int64) []*url.URL {
return s.DroppedAlertmanagers()
}
// UpdateAlertRule looks for the active rule evaluation and commands it to update the rule
func (sch *schedule) UpdateAlertRule(key models.AlertRuleKey) {
ruleInfo, err := sch.registry.get(key)
if err != nil {
return
}
ruleInfo.update()
}
// DeleteAlertRule stops evaluation of the rule, deletes it from active rules, and cleans up state cache.
func (sch *schedule) DeleteAlertRule(key models.AlertRuleKey) {
ruleInfo, ok := sch.registry.del(key)
if !ok {
sch.log.Info("unable to delete alert rule routine information by key", "uid", key.UID, "org_id", key.OrgID)
return
}
// stop rule evaluation
ruleInfo.stop()
}
func (sch *schedule) adminConfigSync(ctx context.Context) error {
for {
select {
@ -369,7 +393,7 @@ func (sch *schedule) ruleEvaluationLoop(ctx context.Context) error {
if newRoutine && !invalidInterval {
dispatcherGroup.Go(func() error {
return sch.ruleRoutine(ruleInfo.ctx, key, ruleInfo.evalCh)
return sch.ruleRoutine(ruleInfo.ctx, key, ruleInfo.evalCh, ruleInfo.updateCh)
})
}
@ -407,12 +431,7 @@ func (sch *schedule) ruleEvaluationLoop(ctx context.Context) error {
// unregister and stop routines of the deleted alert rules
for key := range registeredDefinitions {
ruleInfo, ok := sch.registry.del(key)
if !ok {
sch.log.Error("unable to delete alert rule routine information because it did not exist", "uid", key.UID, "org_id", key.OrgID)
continue
}
ruleInfo.stop()
sch.DeleteAlertRule(key)
}
case <-ctx.Done():
waitErr := dispatcherGroup.Wait()
@ -432,7 +451,7 @@ func (sch *schedule) ruleEvaluationLoop(ctx context.Context) error {
}
}
func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRuleKey, evalCh <-chan *evalContext) error {
func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRuleKey, evalCh <-chan *evalContext, updateCh <-chan struct{}) error {
logger := sch.log.New("uid", key.UID, "org", key.OrgID)
logger.Debug("alert rule routine started")
@ -441,13 +460,60 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
evalDuration := sch.metrics.EvalDuration.WithLabelValues(orgID)
evalTotalFailures := sch.metrics.EvalFailures.WithLabelValues(orgID)
updateRule := func() (*models.AlertRule, error) {
notify := func(alerts definitions.PostableAlerts, logger log.Logger) {
if len(alerts.PostableAlerts) == 0 {
logger.Debug("no alerts to put in the notifier")
return
}
var localNotifierExist, externalNotifierExist bool
logger.Debug("sending alerts to notifier", "count", len(alerts.PostableAlerts), "alerts", alerts.PostableAlerts)
n, err := sch.multiOrgNotifier.AlertmanagerFor(key.OrgID)
if err == nil {
localNotifierExist = true
if err := n.PutAlerts(alerts); err != nil {
logger.Error("failed to put alerts in the local notifier", "count", len(alerts.PostableAlerts), "err", err)
}
} else {
if errors.Is(err, notifier.ErrNoAlertmanagerForOrg) {
logger.Debug("local notifier was not found")
} else {
logger.Error("local notifier is not available", "err", err)
}
}
// Send alerts to external Alertmanager(s) if we have a sender for this organization.
sch.sendersMtx.RLock()
defer sch.sendersMtx.RUnlock()
s, ok := sch.senders[key.OrgID]
if ok {
logger.Debug("sending alerts to external notifier", "count", len(alerts.PostableAlerts))
s.SendAlerts(alerts)
externalNotifierExist = true
}
if !localNotifierExist && !externalNotifierExist {
logger.Error("no external or internal notifier - alerts not delivered!", "count", len(alerts.PostableAlerts))
}
}
clearState := func() {
states := sch.stateManager.GetStatesForRuleUID(key.OrgID, key.UID)
expiredAlerts := FromAlertsStateToStoppedAlert(states, sch.appURL, sch.clock)
sch.stateManager.RemoveByRuleUID(key.OrgID, key.UID)
notify(expiredAlerts, logger)
}
updateRule := func(oldRule *models.AlertRule) (*models.AlertRule, error) {
q := models.GetAlertRuleByUIDQuery{OrgID: key.OrgID, UID: key.UID}
err := sch.ruleStore.GetAlertRuleByUID(&q)
if err != nil {
logger.Error("failed to fetch alert rule", "err", err)
return nil, err
}
if oldRule != nil && oldRule.Version < q.Result.Version {
clearState()
}
return q.Result, nil
}
@ -476,40 +542,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
sch.saveAlertStates(processedStates)
alerts := FromAlertStateToPostableAlerts(processedStates, sch.stateManager, sch.appURL)
if len(alerts.PostableAlerts) == 0 {
logger.Debug("no alerts to put in the notifier")
return nil
}
var localNotifierExist, externalNotifierExist bool
logger.Debug("sending alerts to notifier", "count", len(alerts.PostableAlerts), "alerts", alerts.PostableAlerts)
n, err := sch.multiOrgNotifier.AlertmanagerFor(alertRule.OrgID)
if err == nil {
localNotifierExist = true
if err := n.PutAlerts(alerts); err != nil {
logger.Error("failed to put alerts in the local notifier", "count", len(alerts.PostableAlerts), "err", err)
}
} else {
if errors.Is(err, notifier.ErrNoAlertmanagerForOrg) {
logger.Debug("local notifier was not found")
} else {
logger.Error("local notifier is not available", "err", err)
}
}
// Send alerts to external Alertmanager(s) if we have a sender for this organization.
sch.sendersMtx.RLock()
defer sch.sendersMtx.RUnlock()
s, ok := sch.senders[alertRule.OrgID]
if ok {
logger.Debug("sending alerts to external notifier", "count", len(alerts.PostableAlerts))
s.SendAlerts(alerts)
externalNotifierExist = true
}
if !localNotifierExist && !externalNotifierExist {
logger.Error("no external or internal notifier - alerts not delivered!", "count", len(alerts.PostableAlerts))
}
notify(alerts, logger)
return nil
}
@ -530,6 +563,22 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
defer sch.stopApplied(key)
for {
select {
// used by external services (API) to notify that rule is updated.
case <-updateCh:
logger.Info("fetching new version of the rule")
err := retryIfError(func(attempt int64) error {
newRule, err := updateRule(currentRule)
if err != nil {
return err
}
logger.Debug("new alert rule version fetched", "title", newRule.Title, "version", newRule.Version)
currentRule = newRule
return nil
})
if err != nil {
logger.Error("updating rule failed after all retries", "error", err)
}
// evalCh - used by the scheduler to signal that evaluation is needed.
case ctx, ok := <-evalCh:
if !ok {
logger.Debug("Evaluation channel has been closed. Exiting")
@ -549,7 +598,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
err := retryIfError(func(attempt int64) error {
// fetch latest alert rule version
if currentRule == nil || currentRule.Version < ctx.version {
newRule, err := updateRule()
newRule, err := updateRule(currentRule)
if err != nil {
return err
}
@ -563,6 +612,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
}
}()
case <-grafanaCtx.Done():
clearState()
logger.Debug("stopping alert rule routine")
return nil
}
@ -607,6 +657,27 @@ func (r *alertRuleRegistry) getOrCreateInfo(context context.Context, key models.
return info, !ok
}
// get returns the channel for the specific alert rule
// if the key does not exist returns an error
func (r *alertRuleRegistry) get(key models.AlertRuleKey) (*alertRuleInfo, error) {
r.mu.Lock()
defer r.mu.Unlock()
info, ok := r.alertRuleInfo[key]
if !ok {
return nil, fmt.Errorf("%v key not found", key)
}
return info, nil
}
func (r *alertRuleRegistry) exists(key models.AlertRuleKey) bool {
r.mu.Lock()
defer r.mu.Unlock()
_, ok := r.alertRuleInfo[key]
return ok
}
// del removes pair that has specific key from alertRuleInfo.
// Returns 2-tuple where the first element is value of the removed pair
// and the second element indicates whether element with the specified key existed.
@ -646,14 +717,15 @@ func (r *alertRuleRegistry) keyMap() map[models.AlertRuleKey]struct{} {
}
type alertRuleInfo struct {
evalCh chan *evalContext
ctx context.Context
stop context.CancelFunc
evalCh chan *evalContext
updateCh chan struct{}
ctx context.Context
stop context.CancelFunc
}
func newAlertRuleInfo(parent context.Context) *alertRuleInfo {
ctx, cancel := context.WithCancel(parent)
return &alertRuleInfo{evalCh: make(chan *evalContext), ctx: ctx, stop: cancel}
return &alertRuleInfo{evalCh: make(chan *evalContext), updateCh: make(chan struct{}), ctx: ctx, stop: cancel}
}
// eval signals the rule evaluation routine to perform the evaluation of the rule. Does nothing if the loop is stopped
@ -669,6 +741,16 @@ func (a *alertRuleInfo) eval(t time.Time, version int64) bool {
}
}
// update signals the rule evaluation routine to update the internal state. Does nothing if the loop is stopped
func (a *alertRuleInfo) update() bool {
select {
case a.updateCh <- struct{}{}:
return true
case <-a.ctx.Done():
return false
}
}
type evalContext struct {
now time.Time
version int64

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"net/url"
@ -33,6 +34,7 @@ import (
"github.com/grafana/grafana/pkg/services/secrets/fakes"
secretsManager "github.com/grafana/grafana/pkg/services/secrets/manager"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/util"
)
func TestSendingToExternalAlertmanager(t *testing.T) {
@ -259,6 +261,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
// normal states do not include NoData and Error because currently it is not possible to perform any sensible test
normalStates := []eval.State{eval.Normal, eval.Alerting, eval.Pending}
allStates := [...]eval.State{eval.Normal, eval.Alerting, eval.Pending, eval.NoData, eval.Error}
randomNormalState := func() eval.State {
// pick only supported cases
return normalStates[rand.Intn(3)]
@ -276,7 +279,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
go func() {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan struct{}))
}()
expectedTime := time.UnixMicro(rand.Int63())
@ -373,7 +376,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
go func() {
err := sch.ruleRoutine(ctx, models.AlertRuleKey{}, make(chan *evalContext))
err := sch.ruleRoutine(ctx, models.AlertRuleKey{}, make(chan *evalContext), make(chan struct{}))
stoppedChan <- err
}()
@ -394,7 +397,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
go func() {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan struct{}))
}()
expectedTime := time.UnixMicro(rand.Int63())
@ -446,7 +449,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
go func() {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan struct{}))
}()
expectedTime := time.UnixMicro(rand.Int63())
@ -485,6 +488,175 @@ func TestSchedule_ruleRoutine(t *testing.T) {
require.Len(t, queries, 1, "Expected exactly one request of %T", models.GetAlertRuleByUIDQuery{})
})
t.Run("when update channel is not empty", func(t *testing.T) {
t.Run("should fetch the alert rule from database", func(t *testing.T) {
evalChan := make(chan *evalContext)
evalAppliedChan := make(chan time.Time)
updateChan := make(chan struct{})
sch, ruleStore, _, _, _ := createSchedule(evalAppliedChan)
rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), eval.Alerting) // we want the alert to fire
go func() {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, updateChan)
}()
updateChan <- struct{}{}
// wait for command to be executed
var queries []interface{}
require.Eventuallyf(t, func() bool {
queries = ruleStore.getRecordedCommands(func(cmd interface{}) (interface{}, bool) {
c, ok := cmd.(models.GetAlertRuleByUIDQuery)
return c, ok
})
return len(queries) == 1
}, 5*time.Second, 100*time.Millisecond, "Expected command a single %T to be recorded. All recordings: %#v", models.GetAlertRuleByUIDQuery{}, ruleStore.recordedOps)
m := queries[0].(models.GetAlertRuleByUIDQuery)
require.Equal(t, rule.UID, m.UID)
require.Equal(t, rule.OrgID, m.OrgID)
// now call evaluation loop to make sure that the rule was persisted
evalChan <- &evalContext{
now: time.UnixMicro(rand.Int63()),
version: rule.Version,
}
waitForTimeChannel(t, evalAppliedChan)
queries = ruleStore.getRecordedCommands(func(cmd interface{}) (interface{}, bool) {
c, ok := cmd.(models.GetAlertRuleByUIDQuery)
return c, ok
})
require.Lenf(t, queries, 1, "evaluation loop requested a rule from database but it should not be")
})
t.Run("should retry when database fails", func(t *testing.T) {
evalAppliedChan := make(chan time.Time)
updateChan := make(chan struct{})
sch, ruleStore, _, _, _ := createSchedule(evalAppliedChan)
sch.maxAttempts = rand.Int63n(4) + 1
rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), randomNormalState())
go func() {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), make(chan *evalContext), updateChan)
}()
ruleStore.hook = func(cmd interface{}) error {
if _, ok := cmd.(models.GetAlertRuleByUIDQuery); !ok {
return nil
}
return errors.New("TEST")
}
updateChan <- struct{}{}
var queries []interface{}
require.Eventuallyf(t, func() bool {
queries = ruleStore.getRecordedCommands(func(cmd interface{}) (interface{}, bool) {
c, ok := cmd.(models.GetAlertRuleByUIDQuery)
return c, ok
})
return int64(len(queries)) == sch.maxAttempts
}, 5*time.Second, 100*time.Millisecond, "Expected exactly two request of %T. All recordings: %#v", models.GetAlertRuleByUIDQuery{}, ruleStore.recordedOps)
})
})
t.Run("when rule version is updated", func(t *testing.T) {
t.Run("should clear the state and expire firing alerts", func(t *testing.T) {
fakeAM := NewFakeExternalAlertmanager(t)
defer fakeAM.Close()
orgID := rand.Int63()
s, err := sender.New(nil)
require.NoError(t, err)
adminConfig := &models.AdminConfiguration{OrgID: orgID, Alertmanagers: []string{fakeAM.server.URL}}
err = s.ApplyConfig(adminConfig)
require.NoError(t, err)
s.Run()
defer s.Stop()
require.Eventuallyf(t, func() bool {
return len(s.Alertmanagers()) == 1
}, 20*time.Second, 200*time.Millisecond, "external Alertmanager was not discovered.")
evalChan := make(chan *evalContext)
evalAppliedChan := make(chan time.Time)
updateChan := make(chan struct{})
sch, ruleStore, _, _, _ := createSchedule(evalAppliedChan)
sch.senders[orgID] = s
var rulePtr = CreateTestAlertRule(t, ruleStore, 10, orgID, eval.Alerting) // we want the alert to fire
var rule = *rulePtr
// define some state
states := make([]*state.State, 0, len(allStates))
for _, s := range allStates {
for i := 0; i < 2; i++ {
states = append(states, &state.State{
AlertRuleUID: rule.UID,
CacheId: util.GenerateShortUID(),
OrgID: rule.OrgID,
State: s,
StartsAt: sch.clock.Now(),
EndsAt: sch.clock.Now().Add(time.Duration(rand.Intn(25)+5) * time.Second),
Labels: rule.Labels,
})
}
}
sch.stateManager.Put(states)
states = sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
expectedToBeSent := FromAlertsStateToStoppedAlert(states, sch.appURL, sch.clock)
require.NotEmptyf(t, expectedToBeSent.PostableAlerts, "State manger was expected to return at least one state that can be expired")
go func() {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, updateChan)
}()
wg := sync.WaitGroup{}
wg.Add(1)
ruleStore.hook = func(cmd interface{}) error {
_, ok := cmd.(models.GetAlertRuleByUIDQuery)
if ok {
wg.Done() // add synchronization.
}
return nil
}
updateChan <- struct{}{}
wg.Wait()
newRule := rule
newRule.Version++
ruleStore.putRule(&newRule)
wg.Add(1)
updateChan <- struct{}{}
wg.Wait()
require.Eventually(t, func() bool {
return len(sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)) == 0
}, 5*time.Second, 100*time.Millisecond)
var count int
require.Eventuallyf(t, func() bool {
count = fakeAM.AlertsCount()
return count == len(expectedToBeSent.PostableAlerts)
}, 20*time.Second, 200*time.Millisecond, "Alertmanager was expected to receive %d alerts, but received only %d", len(expectedToBeSent.PostableAlerts), count)
for _, alert := range fakeAM.alerts {
require.Equalf(t, sch.clock.Now().UTC(), time.Time(alert.EndsAt).UTC(), "Alert received by Alertmanager should be expired as of now")
}
})
})
t.Run("when evaluation fails", func(t *testing.T) {
t.Run("it should increase failure counter", func(t *testing.T) {
t.Skip()
@ -529,7 +701,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
go func() {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan struct{}))
}()
evalChan <- &evalContext{
@ -554,6 +726,19 @@ func TestSchedule_ruleRoutine(t *testing.T) {
func TestSchedule_alertRuleInfo(t *testing.T) {
t.Run("when rule evaluation is not stopped", func(t *testing.T) {
t.Run("Update should send to updateCh", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
resultCh := make(chan bool)
go func() {
resultCh <- r.update()
}()
select {
case <-r.updateCh:
require.True(t, <-resultCh)
case <-time.After(5 * time.Second):
t.Fatal("No message was received on update channel")
}
})
t.Run("eval should send to evalCh", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
expected := time.Now()
@ -588,6 +773,11 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
})
})
t.Run("when rule evaluation is stopped", func(t *testing.T) {
t.Run("Update should do nothing", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
r.stop()
require.False(t, r.update())
})
t.Run("eval should do nothing", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
r.stop()
@ -606,7 +796,9 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
for {
select {
case <-r.evalCh:
time.Sleep(time.Millisecond)
time.Sleep(time.Microsecond)
case <-r.updateCh:
time.Sleep(time.Microsecond)
case <-r.ctx.Done():
return
}
@ -617,14 +809,16 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
wg.Add(1)
go func() {
for i := 0; i < 20; i++ {
max := 2
max := 3
if i <= 10 {
max = 1
max = 2
}
switch rand.Intn(max) + 1 {
case 1:
r.eval(time.Now(), rand.Int63())
r.update()
case 2:
r.eval(time.Now(), rand.Int63())
case 3:
r.stop()
}
}
@ -636,6 +830,86 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
})
}
func TestSchedule_UpdateAlertRule(t *testing.T) {
t.Run("when rule exists", func(t *testing.T) {
t.Run("it should call Update", func(t *testing.T) {
sch := setupSchedulerWithFakeStores(t)
key := generateRuleKey()
info, _ := sch.registry.getOrCreateInfo(context.Background(), key)
go func() {
sch.UpdateAlertRule(key)
}()
select {
case <-info.updateCh:
case <-time.After(5 * time.Second):
t.Fatal("No message was received on update channel")
}
})
t.Run("should exit if it is closed", func(t *testing.T) {
sch := setupSchedulerWithFakeStores(t)
key := generateRuleKey()
info, _ := sch.registry.getOrCreateInfo(context.Background(), key)
info.stop()
sch.UpdateAlertRule(key)
})
})
t.Run("when rule does not exist", func(t *testing.T) {
t.Run("should exit", func(t *testing.T) {
sch := setupSchedulerWithFakeStores(t)
key := generateRuleKey()
sch.UpdateAlertRule(key)
})
})
}
func TestSchedule_DeleteAlertRule(t *testing.T) {
t.Run("when rule exists", func(t *testing.T) {
t.Run("it should stop evaluation loop and remove the controller from registry", func(t *testing.T) {
sch := setupSchedulerWithFakeStores(t)
key := generateRuleKey()
info, _ := sch.registry.getOrCreateInfo(context.Background(), key)
sch.DeleteAlertRule(key)
require.False(t, info.update())
require.False(t, info.eval(time.Now(), 1))
require.False(t, sch.registry.exists(key))
})
t.Run("should remove controller from registry", func(t *testing.T) {
sch := setupSchedulerWithFakeStores(t)
key := generateRuleKey()
info, _ := sch.registry.getOrCreateInfo(context.Background(), key)
info.stop()
sch.DeleteAlertRule(key)
require.False(t, info.update())
require.False(t, info.eval(time.Now(), 1))
require.False(t, sch.registry.exists(key))
})
})
t.Run("when rule does not exist", func(t *testing.T) {
t.Run("should exit", func(t *testing.T) {
sch := setupSchedulerWithFakeStores(t)
key := generateRuleKey()
sch.DeleteAlertRule(key)
})
})
}
func generateRuleKey() models.AlertRuleKey {
return models.AlertRuleKey{
OrgID: rand.Int63(),
UID: util.GenerateShortUID(),
}
}
func setupSchedulerWithFakeStores(t *testing.T) *schedule {
t.Helper()
ruleStore := newFakeRuleStore(t)
instanceStore := &FakeInstanceStore{}
adminConfigStore := newFakeAdminConfigStore(t)
sch, _ := setupScheduler(t, ruleStore, instanceStore, adminConfigStore, nil)
return sch
}
func setupScheduler(t *testing.T, rs store.RuleStore, is store.InstanceStore, acs store.AdminConfigurationStore, registry *prometheus.Registry) (*schedule, *clock.Mock) {
t.Helper()

View File

@ -51,7 +51,13 @@ func waitForErrChannel(t *testing.T, ch chan error) error {
}
func newFakeRuleStore(t *testing.T) *fakeRuleStore {
return &fakeRuleStore{t: t, rules: map[int64]map[string]map[string][]*models.AlertRule{}}
return &fakeRuleStore{
t: t,
rules: map[int64]map[string]map[string][]*models.AlertRule{},
hook: func(interface{}) error {
return nil
},
}
}
// FakeRuleStore mocks the RuleStore of the scheduler.
@ -59,6 +65,7 @@ type fakeRuleStore struct {
t *testing.T
mtx sync.Mutex
rules map[int64]map[string]map[string][]*models.AlertRule
hook func(cmd interface{}) error // use hook if you need to intercept some query and return an error
recordedOps []interface{}
}
@ -71,6 +78,22 @@ func (f *fakeRuleStore) putRule(r *models.AlertRule) {
}
}
// getRecordedCommands filters recorded commands using predicate function. Returns the subset of the recorded commands that meet the predicate
func (f *fakeRuleStore) getRecordedCommands(predicate func(cmd interface{}) (interface{}, bool)) []interface{} {
f.mtx.Lock()
defer f.mtx.Unlock()
result := make([]interface{}, 0, len(f.recordedOps))
for _, op := range f.recordedOps {
cmd, ok := predicate(op)
if !ok {
continue
}
result = append(result, cmd)
}
return result
}
func (f *fakeRuleStore) DeleteAlertRuleByUID(_ int64, _ string) error { return nil }
func (f *fakeRuleStore) DeleteNamespaceAlertRules(_ int64, _ string) ([]string, error) {
return []string{}, nil
@ -83,6 +106,9 @@ func (f *fakeRuleStore) GetAlertRuleByUID(q *models.GetAlertRuleByUIDQuery) erro
f.mtx.Lock()
defer f.mtx.Unlock()
f.recordedOps = append(f.recordedOps, *q)
if err := f.hook(*q); err != nil {
return err
}
rgs, ok := f.rules[q.OrgID]
if !ok {
return nil
@ -107,6 +133,9 @@ func (f *fakeRuleStore) GetAlertRulesForScheduling(q *models.ListAlertRulesQuery
f.mtx.Lock()
defer f.mtx.Unlock()
f.recordedOps = append(f.recordedOps, *q)
if err := f.hook(*q); err != nil {
return err
}
for _, rg := range f.rules {
for _, n := range rg {
for _, r := range n {
@ -133,6 +162,9 @@ func (f *fakeRuleStore) GetRuleGroupAlertRules(q *models.ListRuleGroupAlertRules
f.mtx.Lock()
defer f.mtx.Unlock()
f.recordedOps = append(f.recordedOps, *q)
if err := f.hook(*q); err != nil {
return err
}
rgs, ok := f.rules[q.OrgID]
if !ok {
return nil
@ -168,6 +200,9 @@ func (f *fakeRuleStore) GetOrgRuleGroups(q *models.ListOrgRuleGroupsQuery) error
f.mtx.Lock()
defer f.mtx.Unlock()
f.recordedOps = append(f.recordedOps, *q)
if err := f.hook(*q); err != nil {
return err
}
return nil
}
@ -175,12 +210,18 @@ func (f *fakeRuleStore) UpsertAlertRules(q []store.UpsertRule) error {
f.mtx.Lock()
defer f.mtx.Unlock()
f.recordedOps = append(f.recordedOps, q)
if err := f.hook(q); err != nil {
return err
}
return nil
}
func (f *fakeRuleStore) UpdateRuleGroup(cmd store.UpdateRuleGroupCmd) error {
f.mtx.Lock()
defer f.mtx.Unlock()
f.recordedOps = append(f.recordedOps, cmd)
if err := f.hook(cmd); err != nil {
return err
}
rgs, ok := f.rules[cmd.OrgID]
if !ok {
f.rules[cmd.OrgID] = map[string]map[string][]*models.AlertRule{}