mirror of https://github.com/grafana/grafana.git
Provisioning: Fix bug in job progress recording (#108440)
Fix bug in job progress recording
This commit is contained in:
parent
9402071d33
commit
482872c3bf
|
|
@ -175,10 +175,12 @@ func (d *jobDriver) claimAndProcessOneJob(ctx context.Context) error {
|
|||
jobctx, cancel := context.WithTimeout(ctx, d.jobTimeout)
|
||||
defer cancel() // Ensure resources are released when the function returns
|
||||
|
||||
recorder := newJobProgressRecorder(d.onProgress(job))
|
||||
|
||||
// Process the job.
|
||||
start := time.Now()
|
||||
job.Status.Started = start.UnixMilli()
|
||||
err = d.processJob(jobctx, job) // NOTE: We pass in a pointer here such that the job status can be kept in Complete without re-fetching.
|
||||
err = d.processJob(jobctx, job, recorder) // NOTE: We pass in a pointer here such that the job status can be kept in Complete without re-fetching.
|
||||
end := time.Now()
|
||||
logger.Debug("job processed", "duration", end.Sub(start), "error", err)
|
||||
|
||||
|
|
@ -187,17 +189,7 @@ func (d *jobDriver) claimAndProcessOneJob(ctx context.Context) error {
|
|||
err = jobctx.Err()
|
||||
}
|
||||
|
||||
// Mark the job as failed and remove from queue
|
||||
if err != nil {
|
||||
job.Status.State = provisioning.JobStateError
|
||||
job.Status.Errors = append(job.Status.Errors, err.Error())
|
||||
}
|
||||
|
||||
job.Status.Progress = 0 // clear progressbar
|
||||
job.Status.Finished = end.UnixMilli()
|
||||
if !job.Status.State.Finished() {
|
||||
job.Status.State = provisioning.JobStateSuccess // no error
|
||||
}
|
||||
job.Status = recorder.Complete(ctx, err)
|
||||
|
||||
// Save the finished job
|
||||
err = d.historicJobs.WriteJob(ctx, job.DeepCopy())
|
||||
|
|
@ -217,7 +209,7 @@ func (d *jobDriver) claimAndProcessOneJob(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (d *jobDriver) processJob(ctx context.Context, job *provisioning.Job) error {
|
||||
func (d *jobDriver) processJob(ctx context.Context, job *provisioning.Job, recorder JobProgressRecorder) error {
|
||||
for _, worker := range d.workers {
|
||||
if !worker.IsSupported(ctx, *job) {
|
||||
continue
|
||||
|
|
@ -228,16 +220,7 @@ func (d *jobDriver) processJob(ctx context.Context, job *provisioning.Job) error
|
|||
return apifmt.Errorf("failed to get repository '%s': %w", job.Spec.Repository, err)
|
||||
}
|
||||
|
||||
recorder := newJobProgressRecorder(d.onProgress(job))
|
||||
|
||||
err = worker.Process(ctx, repo, *job, recorder)
|
||||
if err != nil {
|
||||
return apifmt.Errorf("worker failed to process job: %w", err)
|
||||
}
|
||||
|
||||
job.Status = recorder.Complete(ctx, err)
|
||||
|
||||
return nil
|
||||
return worker.Process(ctx, repo, *job, recorder)
|
||||
}
|
||||
|
||||
return apifmt.Errorf("no workers were registered to handle the job")
|
||||
|
|
|
|||
Loading…
Reference in New Issue