mirror of https://github.com/grafana/grafana.git
alerting: add lock on job to prevent a race condition (#18218)
without this lock there is a race condition between the scheduler and job processing.
This commit is contained in:
parent
e3181e66b4
commit
364d2358d8
|
|
@ -117,7 +117,7 @@ func (e *AlertEngine) processJobWithRetry(grafanaCtx context.Context, job *Job)
|
||||||
|
|
||||||
// Initialize with first attemptID=1
|
// Initialize with first attemptID=1
|
||||||
attemptChan <- 1
|
attemptChan <- 1
|
||||||
job.Running = true
|
job.SetRunning(true)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
|
@ -141,7 +141,7 @@ func (e *AlertEngine) processJobWithRetry(grafanaCtx context.Context, job *Job)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *AlertEngine) endJob(err error, cancelChan chan context.CancelFunc, job *Job) error {
|
func (e *AlertEngine) endJob(err error, cancelChan chan context.CancelFunc, job *Job) error {
|
||||||
job.Running = false
|
job.SetRunning(false)
|
||||||
close(cancelChan)
|
close(cancelChan)
|
||||||
for cancelFn := range cancelChan {
|
for cancelFn := range cancelChan {
|
||||||
cancelFn()
|
cancelFn()
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,7 @@ func TestEngineTimeouts(t *testing.T) {
|
||||||
setting.AlertingNotificationTimeout = 30 * time.Second
|
setting.AlertingNotificationTimeout = 30 * time.Second
|
||||||
setting.AlertingMaxAttempts = 3
|
setting.AlertingMaxAttempts = 3
|
||||||
engine.resultHandler = &FakeResultHandler{}
|
engine.resultHandler = &FakeResultHandler{}
|
||||||
job := &Job{Running: true, Rule: &Rule{}}
|
job := &Job{running: true, Rule: &Rule{}}
|
||||||
|
|
||||||
Convey("Should trigger as many retries as needed", func() {
|
Convey("Should trigger as many retries as needed", func() {
|
||||||
Convey("pended alert for datasource -> result handler should be worked", func() {
|
Convey("pended alert for datasource -> result handler should be worked", func() {
|
||||||
|
|
|
||||||
|
|
@ -45,7 +45,7 @@ func TestEngineProcessJob(t *testing.T) {
|
||||||
setting.AlertingNotificationTimeout = 30 * time.Second
|
setting.AlertingNotificationTimeout = 30 * time.Second
|
||||||
setting.AlertingMaxAttempts = 3
|
setting.AlertingMaxAttempts = 3
|
||||||
engine.resultHandler = &FakeResultHandler{}
|
engine.resultHandler = &FakeResultHandler{}
|
||||||
job := &Job{Running: true, Rule: &Rule{}}
|
job := &Job{running: true, Rule: &Rule{}}
|
||||||
|
|
||||||
Convey("Should trigger retry if needed", func() {
|
Convey("Should trigger retry if needed", func() {
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,14 +1,33 @@
|
||||||
package alerting
|
package alerting
|
||||||
|
|
||||||
import "github.com/grafana/grafana/pkg/components/null"
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/components/null"
|
||||||
|
)
|
||||||
|
|
||||||
// Job holds state about when the alert rule should be evaluated.
|
// Job holds state about when the alert rule should be evaluated.
|
||||||
type Job struct {
|
type Job struct {
|
||||||
Offset int64
|
Offset int64
|
||||||
OffsetWait bool
|
OffsetWait bool
|
||||||
Delay bool
|
Delay bool
|
||||||
Running bool
|
running bool
|
||||||
Rule *Rule
|
Rule *Rule
|
||||||
|
runningLock sync.Mutex // Lock for running property which is used in the Scheduler and AlertEngine execution
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRunning returns true if the job is running. A lock is taken and released on the Job to ensure atomicity.
|
||||||
|
func (j *Job) GetRunning() bool {
|
||||||
|
defer j.runningLock.Unlock()
|
||||||
|
j.runningLock.Lock()
|
||||||
|
return j.running
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetRunning sets the running property on the Job. A lock is taken and released on the Job to ensure atomicity.
|
||||||
|
func (j *Job) SetRunning(b bool) {
|
||||||
|
j.runningLock.Lock()
|
||||||
|
j.running = b
|
||||||
|
j.runningLock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResultLogEntry represents log data for the alert evaluation.
|
// ResultLogEntry represents log data for the alert evaluation.
|
||||||
|
|
|
||||||
|
|
@ -30,9 +30,8 @@ func (s *schedulerImpl) Update(rules []*Rule) {
|
||||||
if s.jobs[rule.ID] != nil {
|
if s.jobs[rule.ID] != nil {
|
||||||
job = s.jobs[rule.ID]
|
job = s.jobs[rule.ID]
|
||||||
} else {
|
} else {
|
||||||
job = &Job{
|
job = &Job{}
|
||||||
Running: false,
|
job.SetRunning(false)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
job.Rule = rule
|
job.Rule = rule
|
||||||
|
|
@ -52,7 +51,7 @@ func (s *schedulerImpl) Tick(tickTime time.Time, execQueue chan *Job) {
|
||||||
now := tickTime.Unix()
|
now := tickTime.Unix()
|
||||||
|
|
||||||
for _, job := range s.jobs {
|
for _, job := range s.jobs {
|
||||||
if job.Running || job.Rule.State == models.AlertStatePaused {
|
if job.GetRunning() || job.Rule.State == models.AlertStatePaused {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue