From ed5c664e4ab09184dc8e24e0287142cf67bb42c7 Mon Sep 17 00:00:00 2001 From: Yuriy Tseretyan Date: Tue, 11 Jan 2022 11:39:34 -0500 Subject: [PATCH] Alerting: Stop firing of alert when it is updated (#39975) * Update API to call the scheduler to remove\update an alert rule. When a rule is updated by a user, the scheduler will remove the currently firing alert instances and clean up the state cache. * Update evaluation loop in the scheduler to support one more channel that is used to communicate updates to it. * Improved rule deletion from the internal registry. * Move alert rule version from the internal registry (structure alertRuleInfo) closer rule evaluation loop (to evaluation task structure), which will make the registry values immutable. * Extract notification code to a separate function to reuse in update flow. --- pkg/services/ngalert/api/api.go | 2 +- pkg/services/ngalert/api/api_ruler.go | 29 +- pkg/services/ngalert/schedule/compat.go | 17 + pkg/services/ngalert/schedule/compat_test.go | 32 ++ pkg/services/ngalert/schedule/schedule.go | 180 ++++++++--- .../ngalert/schedule/schedule_unit_test.go | 292 +++++++++++++++++- pkg/services/ngalert/schedule/testing.go | 43 ++- 7 files changed, 526 insertions(+), 69 deletions(-) diff --git a/pkg/services/ngalert/api/api.go b/pkg/services/ngalert/api/api.go index 166711ebe88..328baa4e9ac 100644 --- a/pkg/services/ngalert/api/api.go +++ b/pkg/services/ngalert/api/api.go @@ -95,7 +95,7 @@ func (api *API) RegisterAPIEndpoints(m *metrics.API) { api.RegisterRulerApiEndpoints(NewForkedRuler( api.DatasourceCache, NewLotexRuler(proxy, logger), - RulerSrv{DatasourceCache: api.DatasourceCache, QuotaService: api.QuotaService, manager: api.StateManager, store: api.RuleStore, log: logger}, + RulerSrv{DatasourceCache: api.DatasourceCache, QuotaService: api.QuotaService, scheduleService: api.Schedule, store: api.RuleStore, log: logger}, ), m) api.RegisterTestingApiEndpoints(NewForkedTestingApi( TestingApiSrv{ diff --git a/pkg/services/ngalert/api/api_ruler.go b/pkg/services/ngalert/api/api_ruler.go index 980fe410ee4..5268f72a6c1 100644 --- a/pkg/services/ngalert/api/api_ruler.go +++ b/pkg/services/ngalert/api/api_ruler.go @@ -6,26 +6,28 @@ import ( "net/http" "time" + "github.com/grafana/grafana/pkg/services/datasources" + "github.com/grafana/grafana/pkg/services/ngalert/store" + "github.com/grafana/grafana/pkg/services/quota" + + "github.com/prometheus/common/model" + "github.com/grafana/grafana/pkg/api/apierrors" "github.com/grafana/grafana/pkg/api/response" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/models" - "github.com/grafana/grafana/pkg/services/datasources" apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions" ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models" - "github.com/grafana/grafana/pkg/services/ngalert/state" - "github.com/grafana/grafana/pkg/services/ngalert/store" - "github.com/grafana/grafana/pkg/services/quota" + "github.com/grafana/grafana/pkg/services/ngalert/schedule" "github.com/grafana/grafana/pkg/util" "github.com/grafana/grafana/pkg/web" - "github.com/prometheus/common/model" ) type RulerSrv struct { store store.RuleStore DatasourceCache datasources.CacheService QuotaService *quota.QuotaService - manager *state.Manager + scheduleService schedule.ScheduleService log log.Logger } @@ -42,7 +44,10 @@ func (srv RulerSrv) RouteDeleteNamespaceRulesConfig(c *models.ReqContext) respon } for _, uid := range uids { - srv.manager.RemoveByRuleUID(c.SignedInUser.OrgId, uid) + srv.scheduleService.DeleteAlertRule(ngmodels.AlertRuleKey{ + OrgID: c.SignedInUser.OrgId, + UID: uid, + }) } return response.JSON(http.StatusAccepted, util.DynMap{"message": "namespace rules deleted"}) @@ -65,7 +70,10 @@ func (srv RulerSrv) RouteDeleteRuleGroupConfig(c *models.ReqContext) response.Re } for _, uid := range uids { - srv.manager.RemoveByRuleUID(c.SignedInUser.OrgId, uid) + srv.scheduleService.DeleteAlertRule(ngmodels.AlertRuleKey{ + OrgID: c.SignedInUser.OrgId, + UID: uid, + }) } return response.JSON(http.StatusAccepted, util.DynMap{"message": "rule group deleted"}) @@ -288,7 +296,10 @@ func (srv RulerSrv) RoutePostNameRulesConfig(c *models.ReqContext, ruleGroupConf } for uid := range alertRuleUIDs { - srv.manager.RemoveByRuleUID(c.OrgId, uid) + srv.scheduleService.UpdateAlertRule(ngmodels.AlertRuleKey{ + OrgID: c.SignedInUser.OrgId, + UID: uid, + }) } return response.JSON(http.StatusAccepted, util.DynMap{"message": "rule group updated successfully"}) diff --git a/pkg/services/ngalert/schedule/compat.go b/pkg/services/ngalert/schedule/compat.go index a382570795f..17dc9057f6a 100644 --- a/pkg/services/ngalert/schedule/compat.go +++ b/pkg/services/ngalert/schedule/compat.go @@ -6,6 +6,7 @@ import ( "path" "time" + "github.com/benbjohnson/clock" "github.com/go-openapi/strfmt" "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/prometheus/alertmanager/api/v2/models" @@ -125,3 +126,19 @@ func FromAlertStateToPostableAlerts(firingStates []*state.State, stateManager *s stateManager.Put(sentAlerts) return alerts } + +// FromAlertsStateToStoppedAlert converts firingStates that have evaluation state either eval.Alerting or eval.NoData or eval.Error to models.PostableAlert that are accepted by notifiers. +// Returns a list of alert instances that have expiration time.Now +func FromAlertsStateToStoppedAlert(firingStates []*state.State, appURL *url.URL, clock clock.Clock) apimodels.PostableAlerts { + alerts := apimodels.PostableAlerts{PostableAlerts: make([]models.PostableAlert, 0, len(firingStates))} + ts := clock.Now() + for _, alertState := range firingStates { + if alertState.State == eval.Normal || alertState.State == eval.Pending { + continue + } + postableAlert := stateToPostableAlert(alertState, appURL) + postableAlert.EndsAt = strfmt.DateTime(ts) + alerts.PostableAlerts = append(alerts.PostableAlerts, *postableAlert) + } + return alerts +} diff --git a/pkg/services/ngalert/schedule/compat_test.go b/pkg/services/ngalert/schedule/compat_test.go index e8eafc98ba7..704fbaff730 100644 --- a/pkg/services/ngalert/schedule/compat_test.go +++ b/pkg/services/ngalert/schedule/compat_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/benbjohnson/clock" "github.com/go-openapi/strfmt" "github.com/prometheus/alertmanager/api/v2/models" "github.com/prometheus/common/model" @@ -193,6 +194,37 @@ func Test_stateToPostableAlert(t *testing.T) { } } +func Test_FromAlertsStateToStoppedAlert(t *testing.T) { + appURL := &url.URL{ + Scheme: "http:", + Host: fmt.Sprintf("host-%d", rand.Int()), + Path: fmt.Sprintf("path-%d", rand.Int()), + } + + evalStates := [...]eval.State{eval.Normal, eval.Alerting, eval.Pending, eval.Error, eval.NoData} + states := make([]*state.State, 0, len(evalStates)) + for _, s := range evalStates { + states = append(states, randomState(s)) + } + + clk := clock.NewMock() + clk.Set(time.Now()) + + expected := make([]models.PostableAlert, 0, len(states)) + for _, s := range states { + if !(s.State == eval.Alerting || s.State == eval.Error || s.State == eval.NoData) { + continue + } + alert := stateToPostableAlert(s, appURL) + alert.EndsAt = strfmt.DateTime(clk.Now()) + expected = append(expected, *alert) + } + + result := FromAlertsStateToStoppedAlert(states, appURL, clk) + + require.Equal(t, expected, result.PostableAlerts) +} + func randomMapOfStrings() map[string]string { max := 5 result := make(map[string]string, max) diff --git a/pkg/services/ngalert/schedule/schedule.go b/pkg/services/ngalert/schedule/schedule.go index 1d54285b0ce..5ceef7a892e 100644 --- a/pkg/services/ngalert/schedule/schedule.go +++ b/pkg/services/ngalert/schedule/schedule.go @@ -11,6 +11,7 @@ import ( "github.com/grafana/grafana/pkg/expr" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/services/alerting" + "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions" "github.com/grafana/grafana/pkg/services/ngalert/eval" "github.com/grafana/grafana/pkg/services/ngalert/metrics" "github.com/grafana/grafana/pkg/services/ngalert/models" @@ -39,7 +40,10 @@ type ScheduleService interface { // DroppedAlertmanagersFor returns all the dropped Alertmanager URLs for the // organization. DroppedAlertmanagersFor(orgID int64) []*url.URL - + // UpdateAlertRule notifies scheduler that a rule has been changed + UpdateAlertRule(key models.AlertRuleKey) + // DeleteAlertRule notifies scheduler that a rule has been changed + DeleteAlertRule(key models.AlertRuleKey) // the following are used by tests only used for tests evalApplied(models.AlertRuleKey, time.Time) stopApplied(models.AlertRuleKey) @@ -307,6 +311,26 @@ func (sch *schedule) DroppedAlertmanagersFor(orgID int64) []*url.URL { return s.DroppedAlertmanagers() } +// UpdateAlertRule looks for the active rule evaluation and commands it to update the rule +func (sch *schedule) UpdateAlertRule(key models.AlertRuleKey) { + ruleInfo, err := sch.registry.get(key) + if err != nil { + return + } + ruleInfo.update() +} + +// DeleteAlertRule stops evaluation of the rule, deletes it from active rules, and cleans up state cache. +func (sch *schedule) DeleteAlertRule(key models.AlertRuleKey) { + ruleInfo, ok := sch.registry.del(key) + if !ok { + sch.log.Info("unable to delete alert rule routine information by key", "uid", key.UID, "org_id", key.OrgID) + return + } + // stop rule evaluation + ruleInfo.stop() +} + func (sch *schedule) adminConfigSync(ctx context.Context) error { for { select { @@ -369,7 +393,7 @@ func (sch *schedule) ruleEvaluationLoop(ctx context.Context) error { if newRoutine && !invalidInterval { dispatcherGroup.Go(func() error { - return sch.ruleRoutine(ruleInfo.ctx, key, ruleInfo.evalCh) + return sch.ruleRoutine(ruleInfo.ctx, key, ruleInfo.evalCh, ruleInfo.updateCh) }) } @@ -407,12 +431,7 @@ func (sch *schedule) ruleEvaluationLoop(ctx context.Context) error { // unregister and stop routines of the deleted alert rules for key := range registeredDefinitions { - ruleInfo, ok := sch.registry.del(key) - if !ok { - sch.log.Error("unable to delete alert rule routine information because it did not exist", "uid", key.UID, "org_id", key.OrgID) - continue - } - ruleInfo.stop() + sch.DeleteAlertRule(key) } case <-ctx.Done(): waitErr := dispatcherGroup.Wait() @@ -432,7 +451,7 @@ func (sch *schedule) ruleEvaluationLoop(ctx context.Context) error { } } -func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRuleKey, evalCh <-chan *evalContext) error { +func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRuleKey, evalCh <-chan *evalContext, updateCh <-chan struct{}) error { logger := sch.log.New("uid", key.UID, "org", key.OrgID) logger.Debug("alert rule routine started") @@ -441,13 +460,60 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul evalDuration := sch.metrics.EvalDuration.WithLabelValues(orgID) evalTotalFailures := sch.metrics.EvalFailures.WithLabelValues(orgID) - updateRule := func() (*models.AlertRule, error) { + notify := func(alerts definitions.PostableAlerts, logger log.Logger) { + if len(alerts.PostableAlerts) == 0 { + logger.Debug("no alerts to put in the notifier") + return + } + + var localNotifierExist, externalNotifierExist bool + logger.Debug("sending alerts to notifier", "count", len(alerts.PostableAlerts), "alerts", alerts.PostableAlerts) + n, err := sch.multiOrgNotifier.AlertmanagerFor(key.OrgID) + if err == nil { + localNotifierExist = true + if err := n.PutAlerts(alerts); err != nil { + logger.Error("failed to put alerts in the local notifier", "count", len(alerts.PostableAlerts), "err", err) + } + } else { + if errors.Is(err, notifier.ErrNoAlertmanagerForOrg) { + logger.Debug("local notifier was not found") + } else { + logger.Error("local notifier is not available", "err", err) + } + } + + // Send alerts to external Alertmanager(s) if we have a sender for this organization. + sch.sendersMtx.RLock() + defer sch.sendersMtx.RUnlock() + s, ok := sch.senders[key.OrgID] + if ok { + logger.Debug("sending alerts to external notifier", "count", len(alerts.PostableAlerts)) + s.SendAlerts(alerts) + externalNotifierExist = true + } + + if !localNotifierExist && !externalNotifierExist { + logger.Error("no external or internal notifier - alerts not delivered!", "count", len(alerts.PostableAlerts)) + } + } + + clearState := func() { + states := sch.stateManager.GetStatesForRuleUID(key.OrgID, key.UID) + expiredAlerts := FromAlertsStateToStoppedAlert(states, sch.appURL, sch.clock) + sch.stateManager.RemoveByRuleUID(key.OrgID, key.UID) + notify(expiredAlerts, logger) + } + + updateRule := func(oldRule *models.AlertRule) (*models.AlertRule, error) { q := models.GetAlertRuleByUIDQuery{OrgID: key.OrgID, UID: key.UID} err := sch.ruleStore.GetAlertRuleByUID(&q) if err != nil { logger.Error("failed to fetch alert rule", "err", err) return nil, err } + if oldRule != nil && oldRule.Version < q.Result.Version { + clearState() + } return q.Result, nil } @@ -476,40 +542,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul sch.saveAlertStates(processedStates) alerts := FromAlertStateToPostableAlerts(processedStates, sch.stateManager, sch.appURL) - if len(alerts.PostableAlerts) == 0 { - logger.Debug("no alerts to put in the notifier") - return nil - } - - var localNotifierExist, externalNotifierExist bool - logger.Debug("sending alerts to notifier", "count", len(alerts.PostableAlerts), "alerts", alerts.PostableAlerts) - n, err := sch.multiOrgNotifier.AlertmanagerFor(alertRule.OrgID) - if err == nil { - localNotifierExist = true - if err := n.PutAlerts(alerts); err != nil { - logger.Error("failed to put alerts in the local notifier", "count", len(alerts.PostableAlerts), "err", err) - } - } else { - if errors.Is(err, notifier.ErrNoAlertmanagerForOrg) { - logger.Debug("local notifier was not found") - } else { - logger.Error("local notifier is not available", "err", err) - } - } - - // Send alerts to external Alertmanager(s) if we have a sender for this organization. - sch.sendersMtx.RLock() - defer sch.sendersMtx.RUnlock() - s, ok := sch.senders[alertRule.OrgID] - if ok { - logger.Debug("sending alerts to external notifier", "count", len(alerts.PostableAlerts)) - s.SendAlerts(alerts) - externalNotifierExist = true - } - - if !localNotifierExist && !externalNotifierExist { - logger.Error("no external or internal notifier - alerts not delivered!", "count", len(alerts.PostableAlerts)) - } + notify(alerts, logger) return nil } @@ -530,6 +563,22 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul defer sch.stopApplied(key) for { select { + // used by external services (API) to notify that rule is updated. + case <-updateCh: + logger.Info("fetching new version of the rule") + err := retryIfError(func(attempt int64) error { + newRule, err := updateRule(currentRule) + if err != nil { + return err + } + logger.Debug("new alert rule version fetched", "title", newRule.Title, "version", newRule.Version) + currentRule = newRule + return nil + }) + if err != nil { + logger.Error("updating rule failed after all retries", "error", err) + } + // evalCh - used by the scheduler to signal that evaluation is needed. case ctx, ok := <-evalCh: if !ok { logger.Debug("Evaluation channel has been closed. Exiting") @@ -549,7 +598,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul err := retryIfError(func(attempt int64) error { // fetch latest alert rule version if currentRule == nil || currentRule.Version < ctx.version { - newRule, err := updateRule() + newRule, err := updateRule(currentRule) if err != nil { return err } @@ -563,6 +612,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul } }() case <-grafanaCtx.Done(): + clearState() logger.Debug("stopping alert rule routine") return nil } @@ -607,6 +657,27 @@ func (r *alertRuleRegistry) getOrCreateInfo(context context.Context, key models. return info, !ok } +// get returns the channel for the specific alert rule +// if the key does not exist returns an error +func (r *alertRuleRegistry) get(key models.AlertRuleKey) (*alertRuleInfo, error) { + r.mu.Lock() + defer r.mu.Unlock() + + info, ok := r.alertRuleInfo[key] + if !ok { + return nil, fmt.Errorf("%v key not found", key) + } + return info, nil +} + +func (r *alertRuleRegistry) exists(key models.AlertRuleKey) bool { + r.mu.Lock() + defer r.mu.Unlock() + + _, ok := r.alertRuleInfo[key] + return ok +} + // del removes pair that has specific key from alertRuleInfo. // Returns 2-tuple where the first element is value of the removed pair // and the second element indicates whether element with the specified key existed. @@ -646,14 +717,15 @@ func (r *alertRuleRegistry) keyMap() map[models.AlertRuleKey]struct{} { } type alertRuleInfo struct { - evalCh chan *evalContext - ctx context.Context - stop context.CancelFunc + evalCh chan *evalContext + updateCh chan struct{} + ctx context.Context + stop context.CancelFunc } func newAlertRuleInfo(parent context.Context) *alertRuleInfo { ctx, cancel := context.WithCancel(parent) - return &alertRuleInfo{evalCh: make(chan *evalContext), ctx: ctx, stop: cancel} + return &alertRuleInfo{evalCh: make(chan *evalContext), updateCh: make(chan struct{}), ctx: ctx, stop: cancel} } // eval signals the rule evaluation routine to perform the evaluation of the rule. Does nothing if the loop is stopped @@ -669,6 +741,16 @@ func (a *alertRuleInfo) eval(t time.Time, version int64) bool { } } +// update signals the rule evaluation routine to update the internal state. Does nothing if the loop is stopped +func (a *alertRuleInfo) update() bool { + select { + case a.updateCh <- struct{}{}: + return true + case <-a.ctx.Done(): + return false + } +} + type evalContext struct { now time.Time version int64 diff --git a/pkg/services/ngalert/schedule/schedule_unit_test.go b/pkg/services/ngalert/schedule/schedule_unit_test.go index bbc7973242a..a52fb7330b7 100644 --- a/pkg/services/ngalert/schedule/schedule_unit_test.go +++ b/pkg/services/ngalert/schedule/schedule_unit_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "math/rand" "net/url" @@ -33,6 +34,7 @@ import ( "github.com/grafana/grafana/pkg/services/secrets/fakes" secretsManager "github.com/grafana/grafana/pkg/services/secrets/manager" "github.com/grafana/grafana/pkg/setting" + "github.com/grafana/grafana/pkg/util" ) func TestSendingToExternalAlertmanager(t *testing.T) { @@ -259,6 +261,7 @@ func TestSchedule_ruleRoutine(t *testing.T) { // normal states do not include NoData and Error because currently it is not possible to perform any sensible test normalStates := []eval.State{eval.Normal, eval.Alerting, eval.Pending} + allStates := [...]eval.State{eval.Normal, eval.Alerting, eval.Pending, eval.NoData, eval.Error} randomNormalState := func() eval.State { // pick only supported cases return normalStates[rand.Intn(3)] @@ -276,7 +279,7 @@ func TestSchedule_ruleRoutine(t *testing.T) { go func() { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan) + _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan struct{})) }() expectedTime := time.UnixMicro(rand.Int63()) @@ -373,7 +376,7 @@ func TestSchedule_ruleRoutine(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) go func() { - err := sch.ruleRoutine(ctx, models.AlertRuleKey{}, make(chan *evalContext)) + err := sch.ruleRoutine(ctx, models.AlertRuleKey{}, make(chan *evalContext), make(chan struct{})) stoppedChan <- err }() @@ -394,7 +397,7 @@ func TestSchedule_ruleRoutine(t *testing.T) { go func() { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan) + _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan struct{})) }() expectedTime := time.UnixMicro(rand.Int63()) @@ -446,7 +449,7 @@ func TestSchedule_ruleRoutine(t *testing.T) { go func() { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan) + _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan struct{})) }() expectedTime := time.UnixMicro(rand.Int63()) @@ -485,6 +488,175 @@ func TestSchedule_ruleRoutine(t *testing.T) { require.Len(t, queries, 1, "Expected exactly one request of %T", models.GetAlertRuleByUIDQuery{}) }) + t.Run("when update channel is not empty", func(t *testing.T) { + t.Run("should fetch the alert rule from database", func(t *testing.T) { + evalChan := make(chan *evalContext) + evalAppliedChan := make(chan time.Time) + updateChan := make(chan struct{}) + + sch, ruleStore, _, _, _ := createSchedule(evalAppliedChan) + + rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), eval.Alerting) // we want the alert to fire + + go func() { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, updateChan) + }() + updateChan <- struct{}{} + + // wait for command to be executed + var queries []interface{} + require.Eventuallyf(t, func() bool { + queries = ruleStore.getRecordedCommands(func(cmd interface{}) (interface{}, bool) { + c, ok := cmd.(models.GetAlertRuleByUIDQuery) + return c, ok + }) + return len(queries) == 1 + }, 5*time.Second, 100*time.Millisecond, "Expected command a single %T to be recorded. All recordings: %#v", models.GetAlertRuleByUIDQuery{}, ruleStore.recordedOps) + + m := queries[0].(models.GetAlertRuleByUIDQuery) + require.Equal(t, rule.UID, m.UID) + require.Equal(t, rule.OrgID, m.OrgID) + + // now call evaluation loop to make sure that the rule was persisted + evalChan <- &evalContext{ + now: time.UnixMicro(rand.Int63()), + version: rule.Version, + } + waitForTimeChannel(t, evalAppliedChan) + + queries = ruleStore.getRecordedCommands(func(cmd interface{}) (interface{}, bool) { + c, ok := cmd.(models.GetAlertRuleByUIDQuery) + return c, ok + }) + require.Lenf(t, queries, 1, "evaluation loop requested a rule from database but it should not be") + }) + + t.Run("should retry when database fails", func(t *testing.T) { + evalAppliedChan := make(chan time.Time) + updateChan := make(chan struct{}) + + sch, ruleStore, _, _, _ := createSchedule(evalAppliedChan) + sch.maxAttempts = rand.Int63n(4) + 1 + + rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), randomNormalState()) + + go func() { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + _ = sch.ruleRoutine(ctx, rule.GetKey(), make(chan *evalContext), updateChan) + }() + + ruleStore.hook = func(cmd interface{}) error { + if _, ok := cmd.(models.GetAlertRuleByUIDQuery); !ok { + return nil + } + return errors.New("TEST") + } + updateChan <- struct{}{} + + var queries []interface{} + require.Eventuallyf(t, func() bool { + queries = ruleStore.getRecordedCommands(func(cmd interface{}) (interface{}, bool) { + c, ok := cmd.(models.GetAlertRuleByUIDQuery) + return c, ok + }) + return int64(len(queries)) == sch.maxAttempts + }, 5*time.Second, 100*time.Millisecond, "Expected exactly two request of %T. All recordings: %#v", models.GetAlertRuleByUIDQuery{}, ruleStore.recordedOps) + }) + }) + + t.Run("when rule version is updated", func(t *testing.T) { + t.Run("should clear the state and expire firing alerts", func(t *testing.T) { + fakeAM := NewFakeExternalAlertmanager(t) + defer fakeAM.Close() + + orgID := rand.Int63() + s, err := sender.New(nil) + require.NoError(t, err) + adminConfig := &models.AdminConfiguration{OrgID: orgID, Alertmanagers: []string{fakeAM.server.URL}} + err = s.ApplyConfig(adminConfig) + require.NoError(t, err) + s.Run() + defer s.Stop() + + require.Eventuallyf(t, func() bool { + return len(s.Alertmanagers()) == 1 + }, 20*time.Second, 200*time.Millisecond, "external Alertmanager was not discovered.") + + evalChan := make(chan *evalContext) + evalAppliedChan := make(chan time.Time) + updateChan := make(chan struct{}) + + sch, ruleStore, _, _, _ := createSchedule(evalAppliedChan) + sch.senders[orgID] = s + + var rulePtr = CreateTestAlertRule(t, ruleStore, 10, orgID, eval.Alerting) // we want the alert to fire + var rule = *rulePtr + + // define some state + states := make([]*state.State, 0, len(allStates)) + for _, s := range allStates { + for i := 0; i < 2; i++ { + states = append(states, &state.State{ + AlertRuleUID: rule.UID, + CacheId: util.GenerateShortUID(), + OrgID: rule.OrgID, + State: s, + StartsAt: sch.clock.Now(), + EndsAt: sch.clock.Now().Add(time.Duration(rand.Intn(25)+5) * time.Second), + Labels: rule.Labels, + }) + } + } + sch.stateManager.Put(states) + states = sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID) + expectedToBeSent := FromAlertsStateToStoppedAlert(states, sch.appURL, sch.clock) + require.NotEmptyf(t, expectedToBeSent.PostableAlerts, "State manger was expected to return at least one state that can be expired") + + go func() { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, updateChan) + }() + + wg := sync.WaitGroup{} + wg.Add(1) + ruleStore.hook = func(cmd interface{}) error { + _, ok := cmd.(models.GetAlertRuleByUIDQuery) + if ok { + wg.Done() // add synchronization. + } + return nil + } + + updateChan <- struct{}{} + + wg.Wait() + newRule := rule + newRule.Version++ + ruleStore.putRule(&newRule) + wg.Add(1) + updateChan <- struct{}{} + wg.Wait() + + require.Eventually(t, func() bool { + return len(sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)) == 0 + }, 5*time.Second, 100*time.Millisecond) + + var count int + require.Eventuallyf(t, func() bool { + count = fakeAM.AlertsCount() + return count == len(expectedToBeSent.PostableAlerts) + }, 20*time.Second, 200*time.Millisecond, "Alertmanager was expected to receive %d alerts, but received only %d", len(expectedToBeSent.PostableAlerts), count) + + for _, alert := range fakeAM.alerts { + require.Equalf(t, sch.clock.Now().UTC(), time.Time(alert.EndsAt).UTC(), "Alert received by Alertmanager should be expired as of now") + } + }) + }) + t.Run("when evaluation fails", func(t *testing.T) { t.Run("it should increase failure counter", func(t *testing.T) { t.Skip() @@ -529,7 +701,7 @@ func TestSchedule_ruleRoutine(t *testing.T) { go func() { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan) + _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan struct{})) }() evalChan <- &evalContext{ @@ -554,6 +726,19 @@ func TestSchedule_ruleRoutine(t *testing.T) { func TestSchedule_alertRuleInfo(t *testing.T) { t.Run("when rule evaluation is not stopped", func(t *testing.T) { + t.Run("Update should send to updateCh", func(t *testing.T) { + r := newAlertRuleInfo(context.Background()) + resultCh := make(chan bool) + go func() { + resultCh <- r.update() + }() + select { + case <-r.updateCh: + require.True(t, <-resultCh) + case <-time.After(5 * time.Second): + t.Fatal("No message was received on update channel") + } + }) t.Run("eval should send to evalCh", func(t *testing.T) { r := newAlertRuleInfo(context.Background()) expected := time.Now() @@ -588,6 +773,11 @@ func TestSchedule_alertRuleInfo(t *testing.T) { }) }) t.Run("when rule evaluation is stopped", func(t *testing.T) { + t.Run("Update should do nothing", func(t *testing.T) { + r := newAlertRuleInfo(context.Background()) + r.stop() + require.False(t, r.update()) + }) t.Run("eval should do nothing", func(t *testing.T) { r := newAlertRuleInfo(context.Background()) r.stop() @@ -606,7 +796,9 @@ func TestSchedule_alertRuleInfo(t *testing.T) { for { select { case <-r.evalCh: - time.Sleep(time.Millisecond) + time.Sleep(time.Microsecond) + case <-r.updateCh: + time.Sleep(time.Microsecond) case <-r.ctx.Done(): return } @@ -617,14 +809,16 @@ func TestSchedule_alertRuleInfo(t *testing.T) { wg.Add(1) go func() { for i := 0; i < 20; i++ { - max := 2 + max := 3 if i <= 10 { - max = 1 + max = 2 } switch rand.Intn(max) + 1 { case 1: - r.eval(time.Now(), rand.Int63()) + r.update() case 2: + r.eval(time.Now(), rand.Int63()) + case 3: r.stop() } } @@ -636,6 +830,86 @@ func TestSchedule_alertRuleInfo(t *testing.T) { }) } +func TestSchedule_UpdateAlertRule(t *testing.T) { + t.Run("when rule exists", func(t *testing.T) { + t.Run("it should call Update", func(t *testing.T) { + sch := setupSchedulerWithFakeStores(t) + key := generateRuleKey() + info, _ := sch.registry.getOrCreateInfo(context.Background(), key) + go func() { + sch.UpdateAlertRule(key) + }() + + select { + case <-info.updateCh: + case <-time.After(5 * time.Second): + t.Fatal("No message was received on update channel") + } + }) + t.Run("should exit if it is closed", func(t *testing.T) { + sch := setupSchedulerWithFakeStores(t) + key := generateRuleKey() + info, _ := sch.registry.getOrCreateInfo(context.Background(), key) + info.stop() + sch.UpdateAlertRule(key) + }) + }) + t.Run("when rule does not exist", func(t *testing.T) { + t.Run("should exit", func(t *testing.T) { + sch := setupSchedulerWithFakeStores(t) + key := generateRuleKey() + sch.UpdateAlertRule(key) + }) + }) +} + +func TestSchedule_DeleteAlertRule(t *testing.T) { + t.Run("when rule exists", func(t *testing.T) { + t.Run("it should stop evaluation loop and remove the controller from registry", func(t *testing.T) { + sch := setupSchedulerWithFakeStores(t) + key := generateRuleKey() + info, _ := sch.registry.getOrCreateInfo(context.Background(), key) + sch.DeleteAlertRule(key) + require.False(t, info.update()) + require.False(t, info.eval(time.Now(), 1)) + require.False(t, sch.registry.exists(key)) + }) + t.Run("should remove controller from registry", func(t *testing.T) { + sch := setupSchedulerWithFakeStores(t) + key := generateRuleKey() + info, _ := sch.registry.getOrCreateInfo(context.Background(), key) + info.stop() + sch.DeleteAlertRule(key) + require.False(t, info.update()) + require.False(t, info.eval(time.Now(), 1)) + require.False(t, sch.registry.exists(key)) + }) + }) + t.Run("when rule does not exist", func(t *testing.T) { + t.Run("should exit", func(t *testing.T) { + sch := setupSchedulerWithFakeStores(t) + key := generateRuleKey() + sch.DeleteAlertRule(key) + }) + }) +} + +func generateRuleKey() models.AlertRuleKey { + return models.AlertRuleKey{ + OrgID: rand.Int63(), + UID: util.GenerateShortUID(), + } +} + +func setupSchedulerWithFakeStores(t *testing.T) *schedule { + t.Helper() + ruleStore := newFakeRuleStore(t) + instanceStore := &FakeInstanceStore{} + adminConfigStore := newFakeAdminConfigStore(t) + sch, _ := setupScheduler(t, ruleStore, instanceStore, adminConfigStore, nil) + return sch +} + func setupScheduler(t *testing.T, rs store.RuleStore, is store.InstanceStore, acs store.AdminConfigurationStore, registry *prometheus.Registry) (*schedule, *clock.Mock) { t.Helper() diff --git a/pkg/services/ngalert/schedule/testing.go b/pkg/services/ngalert/schedule/testing.go index 516f3be54ec..353bc4424a3 100644 --- a/pkg/services/ngalert/schedule/testing.go +++ b/pkg/services/ngalert/schedule/testing.go @@ -51,7 +51,13 @@ func waitForErrChannel(t *testing.T, ch chan error) error { } func newFakeRuleStore(t *testing.T) *fakeRuleStore { - return &fakeRuleStore{t: t, rules: map[int64]map[string]map[string][]*models.AlertRule{}} + return &fakeRuleStore{ + t: t, + rules: map[int64]map[string]map[string][]*models.AlertRule{}, + hook: func(interface{}) error { + return nil + }, + } } // FakeRuleStore mocks the RuleStore of the scheduler. @@ -59,6 +65,7 @@ type fakeRuleStore struct { t *testing.T mtx sync.Mutex rules map[int64]map[string]map[string][]*models.AlertRule + hook func(cmd interface{}) error // use hook if you need to intercept some query and return an error recordedOps []interface{} } @@ -71,6 +78,22 @@ func (f *fakeRuleStore) putRule(r *models.AlertRule) { } } +// getRecordedCommands filters recorded commands using predicate function. Returns the subset of the recorded commands that meet the predicate +func (f *fakeRuleStore) getRecordedCommands(predicate func(cmd interface{}) (interface{}, bool)) []interface{} { + f.mtx.Lock() + defer f.mtx.Unlock() + + result := make([]interface{}, 0, len(f.recordedOps)) + for _, op := range f.recordedOps { + cmd, ok := predicate(op) + if !ok { + continue + } + result = append(result, cmd) + } + return result +} + func (f *fakeRuleStore) DeleteAlertRuleByUID(_ int64, _ string) error { return nil } func (f *fakeRuleStore) DeleteNamespaceAlertRules(_ int64, _ string) ([]string, error) { return []string{}, nil @@ -83,6 +106,9 @@ func (f *fakeRuleStore) GetAlertRuleByUID(q *models.GetAlertRuleByUIDQuery) erro f.mtx.Lock() defer f.mtx.Unlock() f.recordedOps = append(f.recordedOps, *q) + if err := f.hook(*q); err != nil { + return err + } rgs, ok := f.rules[q.OrgID] if !ok { return nil @@ -107,6 +133,9 @@ func (f *fakeRuleStore) GetAlertRulesForScheduling(q *models.ListAlertRulesQuery f.mtx.Lock() defer f.mtx.Unlock() f.recordedOps = append(f.recordedOps, *q) + if err := f.hook(*q); err != nil { + return err + } for _, rg := range f.rules { for _, n := range rg { for _, r := range n { @@ -133,6 +162,9 @@ func (f *fakeRuleStore) GetRuleGroupAlertRules(q *models.ListRuleGroupAlertRules f.mtx.Lock() defer f.mtx.Unlock() f.recordedOps = append(f.recordedOps, *q) + if err := f.hook(*q); err != nil { + return err + } rgs, ok := f.rules[q.OrgID] if !ok { return nil @@ -168,6 +200,9 @@ func (f *fakeRuleStore) GetOrgRuleGroups(q *models.ListOrgRuleGroupsQuery) error f.mtx.Lock() defer f.mtx.Unlock() f.recordedOps = append(f.recordedOps, *q) + if err := f.hook(*q); err != nil { + return err + } return nil } @@ -175,12 +210,18 @@ func (f *fakeRuleStore) UpsertAlertRules(q []store.UpsertRule) error { f.mtx.Lock() defer f.mtx.Unlock() f.recordedOps = append(f.recordedOps, q) + if err := f.hook(q); err != nil { + return err + } return nil } func (f *fakeRuleStore) UpdateRuleGroup(cmd store.UpdateRuleGroupCmd) error { f.mtx.Lock() defer f.mtx.Unlock() f.recordedOps = append(f.recordedOps, cmd) + if err := f.hook(cmd); err != nil { + return err + } rgs, ok := f.rules[cmd.OrgID] if !ok { f.rules[cmd.OrgID] = map[string]map[string][]*models.AlertRule{}