diff --git a/pkg/services/alerting/alert_rule_reader.go b/pkg/services/alerting/alert_rule_reader.go index a6314c64eba..83946dc07d3 100644 --- a/pkg/services/alerting/alert_rule_reader.go +++ b/pkg/services/alerting/alert_rule_reader.go @@ -12,11 +12,11 @@ type AlertRuleReader struct{} func (this AlertRuleReader) Fetch() []m.AlertRule { return []m.AlertRule{ - {Id: 1, Title: "alert rule 1", Interval: "10s", Frequency: 10}, - {Id: 2, Title: "alert rule 2", Interval: "10s", Frequency: 10}, - {Id: 3, Title: "alert rule 3", Interval: "10s", Frequency: 10}, - {Id: 4, Title: "alert rule 4", Interval: "10s", Frequency: 5}, - {Id: 5, Title: "alert rule 5", Interval: "10s", Frequency: 5}, + //{Id: 1, Title: "alert rule 1", Interval: "10s", Frequency: 10}, + //{Id: 2, Title: "alert rule 2", Interval: "10s", Frequency: 10}, + //{Id: 3, Title: "alert rule 3", Interval: "10s", Frequency: 10}, + //{Id: 4, Title: "alert rule 4", Interval: "10s", Frequency: 5}, + //{Id: 5, Title: "alert rule 5", Interval: "10s", Frequency: 5}, {Id: 6, Title: "alert rule 6", Interval: "10s", Frequency: 1}, } } diff --git a/pkg/services/alerting/alerting.go b/pkg/services/alerting/alerting.go index ca3f11efec5..584011e05e5 100644 --- a/pkg/services/alerting/alerting.go +++ b/pkg/services/alerting/alerting.go @@ -5,7 +5,7 @@ import ( "strconv" "time" - "github.com/grafana/grafana/pkg/bus" + //"github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/log" m "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/setting" @@ -22,12 +22,14 @@ func Init() { scheduler := NewScheduler() go scheduler.Dispatch(&AlertRuleReader{}) go scheduler.Executor(&DummieExecutor{}) + go scheduler.HandleResponses() } type Scheduler struct { - jobs []*AlertJob - runQueue chan *AlertJob - mtx sync.RWMutex + jobs map[int64]*AlertJob + runQueue chan *AlertJob + responseQueue chan *AlertResult + mtx sync.RWMutex alertRuleFetcher RuleReader @@ -38,30 +40,35 @@ type Scheduler struct { func NewScheduler() *Scheduler { return &Scheduler{ - jobs: make([]*AlertJob, 0), - runQueue: make(chan *AlertJob, 1000), - serverId: strconv.Itoa(rand.Intn(1000)), + jobs: make(map[int64]*AlertJob, 0), + runQueue: make(chan *AlertJob, 1000), + responseQueue: make(chan *AlertResult, 1000), + serverId: strconv.Itoa(rand.Intn(1000)), } } func (this *Scheduler) heartBeat() { - //write heartBeat to db. - //get the modulus position of active servers - cmd := &m.HeartBeatCommand{ServerId: this.serverId} + //Lets cheat on this until we focus on clustering log.Info("Heartbeat: Sending heartbeat from " + this.serverId) - err := bus.Dispatch(cmd) + this.clusterSize = 1 + this.serverPosition = 1 - if err != nil { - log.Error(1, "Failed to send heartbeat.") - } else { - this.clusterSize = cmd.Result.ClusterSize - this.serverPosition = cmd.Result.UptimePosition - } + /* + cmd := &m.HeartBeatCommand{ServerId: this.serverId} + err := bus.Dispatch(cmd) + + if err != nil { + log.Error(1, "Failed to send heartbeat.") + } else { + this.clusterSize = cmd.Result.ClusterSize + this.serverPosition = cmd.Result.UptimePosition + } + */ } func (this *Scheduler) Dispatch(reader RuleReader) { - reschedule := time.NewTicker(time.Second * 10) + reschedule := time.NewTicker(time.Second * 100) secondTicker := time.NewTicker(time.Second) heartbeat := time.NewTicker(time.Second * 5) @@ -83,7 +90,7 @@ func (this *Scheduler) Dispatch(reader RuleReader) { func (this *Scheduler) updateJobs(f func() []m.AlertRule) { log.Debug("Scheduler: UpdateJobs()") - jobs := make([]*AlertJob, 0) + jobs := make(map[int64]*AlertJob, 0) rules := f() this.mtx.Lock() @@ -91,10 +98,7 @@ func (this *Scheduler) updateJobs(f func() []m.AlertRule) { for i := this.serverPosition - 1; i < len(rules); i += this.clusterSize { rule := rules[i] - jobs = append(jobs, &AlertJob{ - rule: rule, - offset: int64(len(jobs)), - }) + jobs[rule.Id] = &AlertJob{rule: rule, offset: int64(len(jobs))} } log.Debug("Scheduler: Selected %d jobs", len(jobs)) @@ -117,15 +121,21 @@ func (this *Scheduler) Executor(executor Executor) { for job := range this.runQueue { log.Info("Executor: queue length %d", len(this.runQueue)) log.Info("Executor: executing %s", job.rule.Title) - go Measure(executor, job) + this.jobs[job.rule.Id].running = true + go this.Measure(executor, job) } } -func Measure(exec Executor, rule *AlertJob) { +func (this *Scheduler) HandleResponses() { + for response := range this.responseQueue { + log.Info("Response: alert %d returned %s", response.id, response.state) + this.jobs[response.id].running = false + } +} + +func (this *Scheduler) Measure(exec Executor, rule *AlertJob) { now := time.Now() - rule.running = true - exec.Execute(rule.rule) - rule.running = true + exec.Execute(rule.rule, this.responseQueue) elapsed := time.Since(now) log.Info("Schedular: exeuction took %v milli seconds", elapsed.Nanoseconds()/1000000) } diff --git a/pkg/services/alerting/alerting_test.go b/pkg/services/alerting/alerting_test.go index 8e6b118ed40..0e504b89a33 100644 --- a/pkg/services/alerting/alerting_test.go +++ b/pkg/services/alerting/alerting_test.go @@ -21,7 +21,7 @@ func TestAlertingScheduler(t *testing.T) { Convey("single server", func() { scheduler := &Scheduler{ - jobs: make([]*AlertJob, 0), + jobs: make(map[int64]*AlertJob, 0), runQueue: make(chan *AlertJob, 1000), serverId: "", serverPosition: 1, @@ -34,7 +34,7 @@ func TestAlertingScheduler(t *testing.T) { Convey("two servers", func() { scheduler := &Scheduler{ - jobs: make([]*AlertJob, 0), + jobs: make(map[int64]*AlertJob, 0), runQueue: make(chan *AlertJob, 1000), serverId: "", serverPosition: 1, @@ -43,12 +43,12 @@ func TestAlertingScheduler(t *testing.T) { scheduler.updateJobs(mockFn) So(len(scheduler.jobs), ShouldEqual, 3) - So(scheduler.jobs[0].rule.Id, ShouldEqual, 1) + So(scheduler.jobs[1].rule.Id, ShouldEqual, 1) }) Convey("six servers", func() { scheduler := &Scheduler{ - jobs: make([]*AlertJob, 0), + jobs: make(map[int64]*AlertJob, 0), runQueue: make(chan *AlertJob, 1000), serverId: "", serverPosition: 6, @@ -57,7 +57,7 @@ func TestAlertingScheduler(t *testing.T) { scheduler.updateJobs(mockFn) So(len(scheduler.jobs), ShouldEqual, 1) - So(scheduler.jobs[0].rule.Id, ShouldEqual, 6) + So(scheduler.jobs[6].rule.Id, ShouldEqual, 6) }) Convey("more servers then alerts", func() { @@ -68,7 +68,7 @@ func TestAlertingScheduler(t *testing.T) { } scheduler := &Scheduler{ - jobs: make([]*AlertJob, 0), + jobs: make(map[int64]*AlertJob, 0), runQueue: make(chan *AlertJob, 1000), serverId: "", serverPosition: 3, diff --git a/pkg/services/alerting/executor.go b/pkg/services/alerting/executor.go index fcde8be8865..a40b8264814 100644 --- a/pkg/services/alerting/executor.go +++ b/pkg/services/alerting/executor.go @@ -7,16 +7,17 @@ import ( ) type Executor interface { - Execute(rule m.AlertRule) (err error, result AlertResult) + Execute(rule m.AlertRule, responseQueue chan *AlertResult) } type DummieExecutor struct{} -func (this DummieExecutor) Execute(rule m.AlertRule) (err error, result AlertResult) { - if rule.Id == 6 { - time.Sleep(time.Second * 60) - } +func (this DummieExecutor) Execute(rule m.AlertRule, responseQueue chan *AlertResult) { + //if rule.Id == 6 { + // time.Sleep(time.Second * 60) + //} time.Sleep(time.Second) log.Info("Finnished executing: %d", rule.Id) - return nil, AlertResult{state: "OK", id: rule.Id} + responseQueue <- &AlertResult{state: "OK", id: rule.Id} + //return nil, } diff --git a/pkg/services/sqlstore/alert_heartbeat_test.go b/pkg/services/sqlstore/alert_heartbeat_test.go new file mode 100644 index 00000000000..196d3fd7dde --- /dev/null +++ b/pkg/services/sqlstore/alert_heartbeat_test.go @@ -0,0 +1,18 @@ +package sqlstore + +import ( + "testing" + + // m "github.com/grafana/grafana/pkg/models" + . "github.com/smartystreets/goconvey/convey" +) + +func TestAlertingHeartbeatDataAccess(t *testing.T) { + + Convey("Testing Alerting data access", t, func() { + InitTestDB(t) + //send heartbeat from server 1 + //send heartbeat from server 2 + + }) +}