2021-03-09 04:19:21 +08:00
package schedule
2020-12-17 22:00:09 +08:00
import (
"context"
"fmt"
"sync"
"time"
2021-03-31 00:37:56 +08:00
"github.com/benbjohnson/clock"
2021-04-20 02:26:04 +08:00
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
2021-05-15 04:13:44 +08:00
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
2021-04-19 14:58:44 +08:00
"golang.org/x/sync/errgroup"
2021-03-09 04:19:21 +08:00
2020-12-17 22:00:09 +08:00
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/alerting"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
2021-04-19 14:58:44 +08:00
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/services/ngalert/store"
2021-03-08 14:02:49 +08:00
"github.com/grafana/grafana/pkg/tsdb"
2020-12-17 22:00:09 +08:00
)
2021-03-09 04:19:21 +08:00
// timeNow makes it possible to test usage of time
var timeNow = time . Now
// ScheduleService handles scheduling
type ScheduleService interface {
2021-04-24 03:32:25 +08:00
Ticker ( context . Context , * state . Manager ) error
2021-03-03 23:52:19 +08:00
Pause ( ) error
Unpause ( ) error
2021-04-24 03:32:25 +08:00
WarmStateCache ( * state . Manager )
2021-03-03 23:52:19 +08:00
// the following are used by tests only used for tests
2021-04-04 01:13:29 +08:00
evalApplied ( models . AlertRuleKey , time . Time )
stopApplied ( models . AlertRuleKey )
2021-03-09 04:19:21 +08:00
overrideCfg ( cfg SchedulerCfg )
2021-03-03 23:52:19 +08:00
}
2021-04-24 03:32:25 +08:00
func ( sch * schedule ) ruleRoutine ( grafanaCtx context . Context , key models . AlertRuleKey , evalCh <- chan * evalContext , stopCh <- chan struct { } , stateManager * state . Manager ) error {
2021-04-04 01:13:29 +08:00
sch . log . Debug ( "alert rule routine started" , "key" , key )
2020-12-17 22:00:09 +08:00
evalRunning := false
var attempt int64
2021-04-04 01:13:29 +08:00
var alertRule * models . AlertRule
2020-12-17 22:00:09 +08:00
for {
select {
case ctx := <- evalCh :
if evalRunning {
continue
}
evaluate := func ( attempt int64 ) error {
2021-05-15 04:13:44 +08:00
start := timeNow ( )
2020-12-17 22:00:09 +08:00
2021-04-04 01:13:29 +08:00
// fetch latest alert rule version
if alertRule == nil || alertRule . Version < ctx . version {
q := models . GetAlertRuleByUIDQuery { OrgID : key . OrgID , UID : key . UID }
err := sch . ruleStore . GetAlertRuleByUID ( & q )
2020-12-17 22:00:09 +08:00
if err != nil {
2021-04-04 01:13:29 +08:00
sch . log . Error ( "failed to fetch alert rule" , "key" , key )
2020-12-17 22:00:09 +08:00
return err
}
2021-04-04 01:13:29 +08:00
alertRule = q . Result
sch . log . Debug ( "new alert rule version fetched" , "title" , alertRule . Title , "key" , key , "version" , alertRule . Version )
2020-12-17 22:00:09 +08:00
}
2021-03-12 00:56:58 +08:00
condition := models . Condition {
2021-04-04 01:13:29 +08:00
Condition : alertRule . Condition ,
OrgID : alertRule . OrgID ,
Data : alertRule . Data ,
2020-12-17 22:00:09 +08:00
}
2021-03-08 14:02:49 +08:00
results , err := sch . evaluator . ConditionEval ( & condition , ctx . now , sch . dataService )
2021-05-15 04:13:44 +08:00
var (
end = timeNow ( )
tenant = fmt . Sprint ( alertRule . OrgID )
dur = end . Sub ( start ) . Seconds ( )
)
sch . metrics . EvalTotal . WithLabelValues ( tenant ) . Inc ( )
sch . metrics . EvalDuration . WithLabelValues ( tenant ) . Observe ( dur )
2020-12-17 22:00:09 +08:00
if err != nil {
2021-05-15 04:13:44 +08:00
sch . metrics . EvalFailures . WithLabelValues ( tenant ) . Inc ( )
2021-01-19 02:57:17 +08:00
// consider saving alert instance on error
2021-04-04 01:13:29 +08:00
sch . log . Error ( "failed to evaluate alert rule" , "title" , alertRule . Title ,
2021-03-08 14:02:49 +08:00
"key" , key , "attempt" , attempt , "now" , ctx . now , "duration" , end . Sub ( start ) , "error" , err )
2020-12-17 22:00:09 +08:00
return err
}
2021-04-02 23:11:33 +08:00
2021-04-24 03:32:25 +08:00
processedStates := stateManager . ProcessEvalResults ( alertRule , results )
2021-04-02 23:11:33 +08:00
sch . saveAlertStates ( processedStates )
2021-05-20 04:15:09 +08:00
alerts := FromAlertStateToPostableAlerts ( processedStates , stateManager )
2021-04-22 02:57:17 +08:00
sch . log . Debug ( "sending alerts to notifier" , "count" , len ( alerts . PostableAlerts ) , "alerts" , alerts . PostableAlerts )
2021-04-02 23:11:33 +08:00
err = sch . sendAlerts ( alerts )
2021-03-31 00:37:56 +08:00
if err != nil {
2021-04-19 14:58:44 +08:00
sch . log . Error ( "failed to put alerts in the notifier" , "count" , len ( alerts . PostableAlerts ) , "err" , err )
2021-03-31 00:37:56 +08:00
}
2020-12-17 22:00:09 +08:00
return nil
}
func ( ) {
evalRunning = true
defer func ( ) {
evalRunning = false
2021-03-03 23:52:19 +08:00
sch . evalApplied ( key , ctx . now )
2020-12-17 22:00:09 +08:00
} ( )
2021-03-03 23:52:19 +08:00
for attempt = 0 ; attempt < sch . maxAttempts ; attempt ++ {
2020-12-17 22:00:09 +08:00
err := evaluate ( attempt )
if err == nil {
break
}
}
} ( )
2021-01-11 22:14:03 +08:00
case <- stopCh :
2021-03-03 23:52:19 +08:00
sch . stopApplied ( key )
2021-04-04 01:13:29 +08:00
sch . log . Debug ( "stopping alert rule routine" , "key" , key )
2021-01-11 22:14:03 +08:00
// interrupt evaluation if it's running
return nil
2020-12-17 22:00:09 +08:00
case <- grafanaCtx . Done ( ) :
return grafanaCtx . Err ( )
}
}
}
2021-03-31 00:37:56 +08:00
// Notifier handles the delivery of alert notifications to the end user
type Notifier interface {
2021-04-19 14:58:44 +08:00
PutAlerts ( alerts apimodels . PostableAlerts ) error
2021-03-31 00:37:56 +08:00
}
2020-12-17 22:00:09 +08:00
type schedule struct {
// base tick rate (fastest possible configured check)
baseInterval time . Duration
2021-04-04 01:13:29 +08:00
// each alert rule gets its own channel and routine
registry alertRuleRegistry
2020-12-17 22:00:09 +08:00
maxAttempts int64
clock clock . Clock
heartbeat * alerting . Ticker
// evalApplied is only used for tests: test code can set it to non-nil
// function, and then it'll be called from the event loop whenever the
// message from evalApplied is handled.
2021-04-04 01:13:29 +08:00
evalAppliedFunc func ( models . AlertRuleKey , time . Time )
2020-12-17 22:00:09 +08:00
2021-01-11 22:14:03 +08:00
// stopApplied is only used for tests: test code can set it to non-nil
// function, and then it'll be called from the event loop whenever the
// message from stopApplied is handled.
2021-04-04 01:13:29 +08:00
stopAppliedFunc func ( models . AlertRuleKey )
2021-01-11 22:14:03 +08:00
2020-12-17 22:00:09 +08:00
log log . Logger
2021-01-23 01:27:33 +08:00
evaluator eval . Evaluator
2021-03-03 23:52:19 +08:00
2021-04-04 01:13:29 +08:00
ruleStore store . RuleStore
2021-05-03 19:19:15 +08:00
instanceStore store . InstanceStore
2021-03-08 14:02:49 +08:00
dataService * tsdb . Service
2021-03-31 00:37:56 +08:00
notifier Notifier
2021-05-15 04:13:44 +08:00
metrics * metrics . Metrics
2021-01-23 01:27:33 +08:00
}
2021-03-09 04:19:21 +08:00
// SchedulerCfg is the scheduler configuration.
type SchedulerCfg struct {
C clock . Clock
BaseInterval time . Duration
Logger log . Logger
2021-04-04 01:13:29 +08:00
EvalAppliedFunc func ( models . AlertRuleKey , time . Time )
2021-03-09 04:19:21 +08:00
MaxAttempts int64
2021-04-04 01:13:29 +08:00
StopAppliedFunc func ( models . AlertRuleKey )
2021-03-09 04:19:21 +08:00
Evaluator eval . Evaluator
2021-04-04 01:13:29 +08:00
RuleStore store . RuleStore
2021-05-03 19:19:15 +08:00
InstanceStore store . InstanceStore
2021-03-31 00:37:56 +08:00
Notifier Notifier
2021-05-15 04:13:44 +08:00
Metrics * metrics . Metrics
2020-12-17 22:00:09 +08:00
}
2021-03-09 04:19:21 +08:00
// NewScheduler returns a new schedule.
func NewScheduler ( cfg SchedulerCfg , dataService * tsdb . Service ) * schedule {
ticker := alerting . NewTicker ( cfg . C . Now ( ) , time . Second * 0 , cfg . C , int64 ( cfg . BaseInterval . Seconds ( ) ) )
2020-12-17 22:00:09 +08:00
sch := schedule {
2021-04-04 01:13:29 +08:00
registry : alertRuleRegistry { alertRuleInfo : make ( map [ models . AlertRuleKey ] alertRuleInfo ) } ,
2021-03-09 04:19:21 +08:00
maxAttempts : cfg . MaxAttempts ,
clock : cfg . C ,
baseInterval : cfg . BaseInterval ,
log : cfg . Logger ,
2021-03-03 23:52:19 +08:00
heartbeat : ticker ,
2021-03-09 04:19:21 +08:00
evalAppliedFunc : cfg . EvalAppliedFunc ,
stopAppliedFunc : cfg . StopAppliedFunc ,
evaluator : cfg . Evaluator ,
2021-04-04 01:13:29 +08:00
ruleStore : cfg . RuleStore ,
2021-05-03 19:19:15 +08:00
instanceStore : cfg . InstanceStore ,
2021-03-08 14:02:49 +08:00
dataService : dataService ,
2021-03-31 00:37:56 +08:00
notifier : cfg . Notifier ,
2021-05-15 04:13:44 +08:00
metrics : cfg . Metrics ,
2020-12-17 22:00:09 +08:00
}
return & sch
}
2021-03-09 04:19:21 +08:00
func ( sch * schedule ) overrideCfg ( cfg SchedulerCfg ) {
sch . clock = cfg . C
sch . baseInterval = cfg . BaseInterval
sch . heartbeat = alerting . NewTicker ( cfg . C . Now ( ) , time . Second * 0 , cfg . C , int64 ( cfg . BaseInterval . Seconds ( ) ) )
sch . evalAppliedFunc = cfg . EvalAppliedFunc
sch . stopAppliedFunc = cfg . StopAppliedFunc
2021-03-03 23:52:19 +08:00
}
2021-04-04 01:13:29 +08:00
func ( sch * schedule ) evalApplied ( alertDefKey models . AlertRuleKey , now time . Time ) {
2021-03-03 23:52:19 +08:00
if sch . evalAppliedFunc == nil {
return
}
sch . evalAppliedFunc ( alertDefKey , now )
}
2021-04-04 01:13:29 +08:00
func ( sch * schedule ) stopApplied ( alertDefKey models . AlertRuleKey ) {
2021-03-03 23:52:19 +08:00
if sch . stopAppliedFunc == nil {
return
}
sch . stopAppliedFunc ( alertDefKey )
}
func ( sch * schedule ) Pause ( ) error {
2020-12-17 22:00:09 +08:00
if sch == nil {
return fmt . Errorf ( "scheduler is not initialised" )
}
sch . heartbeat . Pause ( )
2021-04-04 01:13:29 +08:00
sch . log . Info ( "alert rule scheduler paused" , "now" , sch . clock . Now ( ) )
2020-12-17 22:00:09 +08:00
return nil
}
2021-03-03 23:52:19 +08:00
func ( sch * schedule ) Unpause ( ) error {
2020-12-17 22:00:09 +08:00
if sch == nil {
return fmt . Errorf ( "scheduler is not initialised" )
}
sch . heartbeat . Unpause ( )
2021-04-04 01:13:29 +08:00
sch . log . Info ( "alert rule scheduler unpaused" , "now" , sch . clock . Now ( ) )
2020-12-17 22:00:09 +08:00
return nil
}
2021-04-24 03:32:25 +08:00
func ( sch * schedule ) Ticker ( grafanaCtx context . Context , stateManager * state . Manager ) error {
2020-12-17 22:00:09 +08:00
dispatcherGroup , ctx := errgroup . WithContext ( grafanaCtx )
for {
select {
2021-03-03 23:52:19 +08:00
case tick := <- sch . heartbeat . C :
tickNum := tick . Unix ( ) / int64 ( sch . baseInterval . Seconds ( ) )
2021-04-04 01:13:29 +08:00
alertRules := sch . fetchAllDetails ( )
sch . log . Debug ( "alert rules fetched" , "count" , len ( alertRules ) )
2020-12-17 22:00:09 +08:00
2021-04-04 01:13:29 +08:00
// registeredDefinitions is a map used for finding deleted alert rules
// initially it is assigned to all known alert rules from the previous cycle
// each alert rule found also in this cycle is removed
// so, at the end, the remaining registered alert rules are the deleted ones
2021-03-03 23:52:19 +08:00
registeredDefinitions := sch . registry . keyMap ( )
2020-12-17 22:00:09 +08:00
type readyToRunItem struct {
2021-04-04 01:13:29 +08:00
key models . AlertRuleKey
ruleInfo alertRuleInfo
2020-12-17 22:00:09 +08:00
}
readyToRun := make ( [ ] readyToRunItem , 0 )
2021-04-04 01:13:29 +08:00
for _ , item := range alertRules {
2021-03-09 04:19:21 +08:00
key := item . GetKey ( )
2020-12-17 22:00:09 +08:00
itemVersion := item . Version
2021-03-03 23:52:19 +08:00
newRoutine := ! sch . registry . exists ( key )
2021-04-04 01:13:29 +08:00
ruleInfo := sch . registry . getOrCreateInfo ( key , itemVersion )
2021-03-03 23:52:19 +08:00
invalidInterval := item . IntervalSeconds % int64 ( sch . baseInterval . Seconds ( ) ) != 0
2020-12-17 22:00:09 +08:00
if newRoutine && ! invalidInterval {
dispatcherGroup . Go ( func ( ) error {
2021-04-24 03:32:25 +08:00
return sch . ruleRoutine ( ctx , key , ruleInfo . evalCh , ruleInfo . stopCh , stateManager )
2020-12-17 22:00:09 +08:00
} )
}
if invalidInterval {
// this is expected to be always false
2021-04-04 01:13:29 +08:00
// give that we validate interval during alert rule updates
sch . log . Debug ( "alert rule with invalid interval will be ignored: interval should be divided exactly by scheduler interval" , "key" , key , "interval" , time . Duration ( item . IntervalSeconds ) * time . Second , "scheduler interval" , sch . baseInterval )
2020-12-17 22:00:09 +08:00
continue
}
2021-03-03 23:52:19 +08:00
itemFrequency := item . IntervalSeconds / int64 ( sch . baseInterval . Seconds ( ) )
2020-12-17 22:00:09 +08:00
if item . IntervalSeconds != 0 && tickNum % itemFrequency == 0 {
2021-04-04 01:13:29 +08:00
readyToRun = append ( readyToRun , readyToRunItem { key : key , ruleInfo : ruleInfo } )
2020-12-17 22:00:09 +08:00
}
2021-04-04 01:13:29 +08:00
// remove the alert rule from the registered alert rules
2021-01-07 23:45:42 +08:00
delete ( registeredDefinitions , key )
2020-12-17 22:00:09 +08:00
}
var step int64 = 0
if len ( readyToRun ) > 0 {
2021-03-03 23:52:19 +08:00
step = sch . baseInterval . Nanoseconds ( ) / int64 ( len ( readyToRun ) )
2020-12-17 22:00:09 +08:00
}
for i := range readyToRun {
item := readyToRun [ i ]
time . AfterFunc ( time . Duration ( int64 ( i ) * step ) , func ( ) {
2021-04-04 01:13:29 +08:00
item . ruleInfo . evalCh <- & evalContext { now : tick , version : item . ruleInfo . version }
2020-12-17 22:00:09 +08:00
} )
}
2021-04-04 01:13:29 +08:00
// unregister and stop routines of the deleted alert rules
2021-01-07 23:45:42 +08:00
for key := range registeredDefinitions {
2021-04-04 01:13:29 +08:00
ruleInfo , err := sch . registry . get ( key )
2021-01-11 22:14:03 +08:00
if err != nil {
2021-04-04 01:13:29 +08:00
sch . log . Error ( "failed to get alert rule routine information" , "err" , err )
2021-01-11 22:14:03 +08:00
continue
}
2021-04-04 01:13:29 +08:00
ruleInfo . stopCh <- struct { } { }
2021-03-03 23:52:19 +08:00
sch . registry . del ( key )
2020-12-17 22:00:09 +08:00
}
case <- grafanaCtx . Done ( ) :
2021-05-12 19:17:43 +08:00
waitErr := dispatcherGroup . Wait ( )
orgIds , err := sch . instanceStore . FetchOrgIds ( )
if err != nil {
2021-05-05 00:57:50 +08:00
sch . log . Error ( "unable to fetch orgIds" , "msg" , err . Error ( ) )
}
2021-05-12 19:17:43 +08:00
for _ , v := range orgIds {
sch . saveAlertStates ( stateManager . GetAll ( v ) )
2021-05-05 00:57:50 +08:00
}
2021-05-12 19:17:43 +08:00
2021-04-24 03:32:25 +08:00
stateManager . Close ( )
2021-05-12 19:17:43 +08:00
return waitErr
2020-12-17 22:00:09 +08:00
}
}
}
2021-04-19 14:58:44 +08:00
func ( sch * schedule ) sendAlerts ( alerts apimodels . PostableAlerts ) error {
return sch . notifier . PutAlerts ( alerts )
2021-03-31 00:37:56 +08:00
}
2021-04-24 03:32:25 +08:00
func ( sch * schedule ) saveAlertStates ( states [ ] * state . State ) {
2021-04-02 23:11:33 +08:00
sch . log . Debug ( "saving alert states" , "count" , len ( states ) )
for _ , s := range states {
cmd := models . SaveAlertInstanceCommand {
2021-05-03 19:19:15 +08:00
RuleOrgID : s . OrgID ,
RuleUID : s . AlertRuleUID ,
2021-04-02 23:11:33 +08:00
Labels : models . InstanceLabels ( s . Labels ) ,
State : models . InstanceStateType ( s . State . String ( ) ) ,
LastEvalTime : s . LastEvaluationTime ,
CurrentStateSince : s . StartsAt ,
CurrentStateEnd : s . EndsAt ,
}
2021-05-03 19:19:15 +08:00
err := sch . instanceStore . SaveAlertInstance ( & cmd )
2021-04-02 23:11:33 +08:00
if err != nil {
2021-04-22 00:30:03 +08:00
sch . log . Error ( "failed to save alert state" , "uid" , s . AlertRuleUID , "orgId" , s . OrgID , "labels" , s . Labels . String ( ) , "state" , s . State . String ( ) , "msg" , err . Error ( ) )
2021-04-02 23:11:33 +08:00
}
}
}
2021-04-24 03:32:25 +08:00
func ( sch * schedule ) WarmStateCache ( st * state . Manager ) {
2021-04-02 23:11:33 +08:00
sch . log . Info ( "warming cache for startup" )
st . ResetCache ( )
2021-05-12 19:17:43 +08:00
orgIds , err := sch . instanceStore . FetchOrgIds ( )
if err != nil {
2021-04-02 23:11:33 +08:00
sch . log . Error ( "unable to fetch orgIds" , "msg" , err . Error ( ) )
}
2021-04-24 03:32:25 +08:00
var states [ ] * state . State
2021-05-12 19:17:43 +08:00
for _ , orgId := range orgIds {
2021-05-01 02:23:12 +08:00
// Get Rules
ruleCmd := models . ListAlertRulesQuery {
2021-05-12 19:17:43 +08:00
OrgID : orgId ,
2021-05-01 02:23:12 +08:00
}
if err := sch . ruleStore . GetOrgAlertRules ( & ruleCmd ) ; err != nil {
sch . log . Error ( "unable to fetch previous state" , "msg" , err . Error ( ) )
}
ruleByUID := make ( map [ string ] * models . AlertRule , len ( ruleCmd . Result ) )
for _ , rule := range ruleCmd . Result {
ruleByUID [ rule . UID ] = rule
}
// Get Instances
2021-04-02 23:11:33 +08:00
cmd := models . ListAlertInstancesQuery {
2021-05-12 19:17:43 +08:00
RuleOrgID : orgId ,
2021-04-02 23:11:33 +08:00
}
2021-05-03 19:19:15 +08:00
if err := sch . instanceStore . ListAlertInstances ( & cmd ) ; err != nil {
2021-04-02 23:11:33 +08:00
sch . log . Error ( "unable to fetch previous state" , "msg" , err . Error ( ) )
}
2021-05-01 02:23:12 +08:00
2021-04-02 23:11:33 +08:00
for _ , entry := range cmd . Result {
2021-05-12 19:17:43 +08:00
ruleForEntry , ok := ruleByUID [ entry . RuleUID ]
2021-05-01 02:23:12 +08:00
if ! ok {
2021-05-12 19:17:43 +08:00
sch . log . Error ( "rule not found for instance, ignoring" , "rule" , entry . RuleUID )
2021-05-02 01:01:28 +08:00
continue
2021-05-01 02:23:12 +08:00
}
2021-04-14 05:38:09 +08:00
lbs := map [ string ] string ( entry . Labels )
2021-04-28 23:42:19 +08:00
cacheId , err := entry . Labels . StringKey ( )
if err != nil {
sch . log . Error ( "error getting cacheId for entry" , "msg" , err . Error ( ) )
}
2021-04-24 03:32:25 +08:00
stateForEntry := & state . State {
2021-05-12 19:17:43 +08:00
AlertRuleUID : entry . RuleUID ,
2021-05-01 02:23:12 +08:00
OrgID : entry . RuleOrgID ,
2021-04-28 23:42:19 +08:00
CacheId : cacheId ,
2021-04-02 23:11:33 +08:00
Labels : lbs ,
State : translateInstanceState ( entry . CurrentState ) ,
2021-04-24 03:32:25 +08:00
Results : [ ] state . Evaluation { } ,
2021-04-02 23:11:33 +08:00
StartsAt : entry . CurrentStateSince ,
EndsAt : entry . CurrentStateEnd ,
LastEvaluationTime : entry . LastEvalTime ,
2021-05-01 02:23:12 +08:00
Annotations : ruleForEntry . Annotations ,
2021-04-02 23:11:33 +08:00
}
states = append ( states , stateForEntry )
}
}
st . Put ( states )
}
func translateInstanceState ( state models . InstanceStateType ) eval . State {
switch {
case state == models . InstanceStateFiring :
return eval . Alerting
case state == models . InstanceStateNormal :
return eval . Normal
default :
return eval . Error
}
}
2021-04-04 01:13:29 +08:00
type alertRuleRegistry struct {
mu sync . Mutex
alertRuleInfo map [ models . AlertRuleKey ] alertRuleInfo
2020-12-17 22:00:09 +08:00
}
2021-04-04 01:13:29 +08:00
// getOrCreateInfo returns the channel for the specific alert rule
2020-12-17 22:00:09 +08:00
// if it does not exists creates one and returns it
2021-04-04 01:13:29 +08:00
func ( r * alertRuleRegistry ) getOrCreateInfo ( key models . AlertRuleKey , ruleVersion int64 ) alertRuleInfo {
2020-12-17 22:00:09 +08:00
r . mu . Lock ( )
defer r . mu . Unlock ( )
2021-04-04 01:13:29 +08:00
info , ok := r . alertRuleInfo [ key ]
2020-12-17 22:00:09 +08:00
if ! ok {
2021-04-04 01:13:29 +08:00
r . alertRuleInfo [ key ] = alertRuleInfo { evalCh : make ( chan * evalContext ) , stopCh : make ( chan struct { } ) , version : ruleVersion }
return r . alertRuleInfo [ key ]
2020-12-17 22:00:09 +08:00
}
2021-04-04 01:13:29 +08:00
info . version = ruleVersion
r . alertRuleInfo [ key ] = info
2020-12-17 22:00:09 +08:00
return info
}
2021-04-04 01:13:29 +08:00
// get returns the channel for the specific alert rule
2021-01-11 22:14:03 +08:00
// if the key does not exist returns an error
2021-04-04 01:13:29 +08:00
func ( r * alertRuleRegistry ) get ( key models . AlertRuleKey ) ( * alertRuleInfo , error ) {
2021-01-11 22:14:03 +08:00
r . mu . Lock ( )
defer r . mu . Unlock ( )
2021-04-04 01:13:29 +08:00
info , ok := r . alertRuleInfo [ key ]
2021-01-11 22:14:03 +08:00
if ! ok {
return nil , fmt . Errorf ( "%v key not found" , key )
}
return & info , nil
}
2021-04-04 01:13:29 +08:00
func ( r * alertRuleRegistry ) exists ( key models . AlertRuleKey ) bool {
2020-12-17 22:00:09 +08:00
r . mu . Lock ( )
defer r . mu . Unlock ( )
2021-04-04 01:13:29 +08:00
_ , ok := r . alertRuleInfo [ key ]
2020-12-17 22:00:09 +08:00
return ok
}
2021-04-04 01:13:29 +08:00
func ( r * alertRuleRegistry ) del ( key models . AlertRuleKey ) {
2020-12-17 22:00:09 +08:00
r . mu . Lock ( )
defer r . mu . Unlock ( )
2021-04-04 01:13:29 +08:00
delete ( r . alertRuleInfo , key )
2020-12-17 22:00:09 +08:00
}
2021-04-04 01:13:29 +08:00
func ( r * alertRuleRegistry ) iter ( ) <- chan models . AlertRuleKey {
c := make ( chan models . AlertRuleKey )
2020-12-17 22:00:09 +08:00
f := func ( ) {
r . mu . Lock ( )
defer r . mu . Unlock ( )
2021-04-04 01:13:29 +08:00
for k := range r . alertRuleInfo {
2020-12-17 22:00:09 +08:00
c <- k
}
close ( c )
}
go f ( )
return c
}
2021-04-04 01:13:29 +08:00
func ( r * alertRuleRegistry ) keyMap ( ) map [ models . AlertRuleKey ] struct { } {
definitionsIDs := make ( map [ models . AlertRuleKey ] struct { } )
2021-01-07 23:45:42 +08:00
for k := range r . iter ( ) {
definitionsIDs [ k ] = struct { } { }
2020-12-17 22:00:09 +08:00
}
return definitionsIDs
}
2021-04-04 01:13:29 +08:00
type alertRuleInfo struct {
2021-01-11 22:14:03 +08:00
evalCh chan * evalContext
stopCh chan struct { }
2020-12-17 22:00:09 +08:00
version int64
}
type evalContext struct {
now time . Time
version int64
}