mirror of https://github.com/grafana/grafana.git
Cleanup: Add traces to cleanup jobs (#55465)
This commit is contained in:
parent
8440baab91
commit
c10a69c007
|
|
@ -3,13 +3,18 @@ package cleanup
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/serverlock"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/services/annotations"
|
||||
"github.com/grafana/grafana/pkg/services/dashboardsnapshots"
|
||||
|
|
@ -26,7 +31,7 @@ import (
|
|||
func ProvideService(cfg *setting.Cfg, serverLockService *serverlock.ServerLockService,
|
||||
shortURLService shorturls.Service, sqlstore *sqlstore.SQLStore, queryHistoryService queryhistory.Service,
|
||||
dashboardVersionService dashver.Service, dashSnapSvc dashboardsnapshots.Service, deleteExpiredImageService *image.DeleteExpiredService,
|
||||
loginAttemptService loginattempt.Service, tempUserService tempuser.Service) *CleanUpService {
|
||||
loginAttemptService loginattempt.Service, tempUserService tempuser.Service, tracer tracing.Tracer) *CleanUpService {
|
||||
s := &CleanUpService{
|
||||
Cfg: cfg,
|
||||
ServerLockService: serverLockService,
|
||||
|
|
@ -39,12 +44,14 @@ func ProvideService(cfg *setting.Cfg, serverLockService *serverlock.ServerLockSe
|
|||
deleteExpiredImageService: deleteExpiredImageService,
|
||||
loginAttemptService: loginAttemptService,
|
||||
tempUserService: tempUserService,
|
||||
tracer: tracer,
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
type CleanUpService struct {
|
||||
log log.Logger
|
||||
tracer tracing.Tracer
|
||||
store sqlstore.Store
|
||||
Cfg *setting.Cfg
|
||||
ServerLockService *serverlock.ServerLockService
|
||||
|
|
@ -57,66 +64,99 @@ type CleanUpService struct {
|
|||
tempUserService tempuser.Service
|
||||
}
|
||||
|
||||
type cleanUpJob struct {
|
||||
name string
|
||||
fn func(context.Context)
|
||||
}
|
||||
|
||||
func (j cleanUpJob) String() string {
|
||||
return strconv.Quote(j.name)
|
||||
}
|
||||
|
||||
func (srv *CleanUpService) Run(ctx context.Context) error {
|
||||
srv.cleanUpTmpFiles()
|
||||
srv.cleanUpTmpFiles(ctx)
|
||||
|
||||
ticker := time.NewTicker(time.Minute * 10)
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
ctxWithTimeout, cancelFn := context.WithTimeout(ctx, time.Minute*9)
|
||||
defer cancelFn()
|
||||
|
||||
srv.cleanUpTmpFiles()
|
||||
srv.deleteExpiredSnapshots(ctx)
|
||||
srv.deleteExpiredDashboardVersions(ctx)
|
||||
srv.deleteExpiredImages(ctx)
|
||||
srv.cleanUpOldAnnotations(ctxWithTimeout)
|
||||
srv.expireOldUserInvites(ctx)
|
||||
srv.deleteStaleShortURLs(ctx)
|
||||
srv.deleteStaleQueryHistory(ctx)
|
||||
err := srv.ServerLockService.LockAndExecute(ctx, "delete old login attempts",
|
||||
time.Minute*10, func(context.Context) {
|
||||
srv.deleteOldLoginAttempts(ctx)
|
||||
})
|
||||
if err != nil {
|
||||
srv.log.Error("failed to lock and execute cleanup of old login attempts", "error", err)
|
||||
}
|
||||
srv.clean(ctx)
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (srv *CleanUpService) clean(ctx context.Context) {
|
||||
const timeout = 9 * time.Minute
|
||||
start := time.Now()
|
||||
ctx, span := srv.tracer.Start(ctx, "cleanup background job")
|
||||
defer span.End()
|
||||
ctx, cancelFn := context.WithTimeout(ctx, timeout)
|
||||
defer cancelFn()
|
||||
|
||||
cleanupJobs := []cleanUpJob{
|
||||
{"clean up temporary files", srv.cleanUpTmpFiles},
|
||||
{"delete expired snapshots", srv.deleteExpiredSnapshots},
|
||||
{"delete expired dashboard versions", srv.deleteExpiredDashboardVersions},
|
||||
{"delete expired images", srv.deleteExpiredImages},
|
||||
{"cleanup old annotations", srv.cleanUpOldAnnotations},
|
||||
{"expire old user invites", srv.expireOldUserInvites},
|
||||
{"delete stale short URLs", srv.deleteStaleShortURLs},
|
||||
{"delete stale query history", srv.deleteStaleQueryHistory},
|
||||
{"delete old login attempts", srv.deleteOldLoginAttempts},
|
||||
}
|
||||
|
||||
logger := srv.log.FromContext(ctx)
|
||||
logger.Debug("Starting cleanup jobs", "jobs", fmt.Sprintf("%v", cleanupJobs))
|
||||
|
||||
for _, j := range cleanupJobs {
|
||||
if ctx.Err() != nil {
|
||||
logger.Error("Cancelled cleanup job", "error", ctx.Err(), "duration", time.Since(start))
|
||||
return
|
||||
}
|
||||
ctx, span := srv.tracer.Start(ctx, j.name)
|
||||
j.fn(ctx)
|
||||
span.End()
|
||||
}
|
||||
|
||||
logger.Info("Completed cleanup jobs", "duration", time.Since(start))
|
||||
}
|
||||
|
||||
func (srv *CleanUpService) cleanUpOldAnnotations(ctx context.Context) {
|
||||
logger := srv.log.FromContext(ctx)
|
||||
cleaner := annotations.GetAnnotationCleaner()
|
||||
affected, affectedTags, err := cleaner.CleanAnnotations(ctx, srv.Cfg)
|
||||
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
|
||||
srv.log.Error("failed to clean up old annotations", "error", err)
|
||||
logger.Error("failed to clean up old annotations", "error", err)
|
||||
} else {
|
||||
srv.log.Debug("Deleted excess annotations", "annotations affected", affected, "annotation tags affected", affectedTags)
|
||||
logger.Debug("Deleted excess annotations", "annotations affected", affected, "annotation tags affected", affectedTags)
|
||||
}
|
||||
}
|
||||
|
||||
func (srv *CleanUpService) cleanUpTmpFiles() {
|
||||
func (srv *CleanUpService) cleanUpTmpFiles(ctx context.Context) {
|
||||
folders := []string{
|
||||
srv.Cfg.ImagesDir,
|
||||
srv.Cfg.CSVsDir,
|
||||
}
|
||||
|
||||
for _, f := range folders {
|
||||
srv.cleanUpTmpFolder(f)
|
||||
ctx, span := srv.tracer.Start(ctx, "delete stale files in temporary directory")
|
||||
span.SetAttributes("directory", f, attribute.Key("directory").String(f))
|
||||
srv.cleanUpTmpFolder(ctx, f)
|
||||
span.End()
|
||||
}
|
||||
}
|
||||
|
||||
func (srv *CleanUpService) cleanUpTmpFolder(folder string) {
|
||||
func (srv *CleanUpService) cleanUpTmpFolder(ctx context.Context, folder string) {
|
||||
logger := srv.log.FromContext(ctx)
|
||||
if _, err := os.Stat(folder); os.IsNotExist(err) {
|
||||
return
|
||||
}
|
||||
|
||||
files, err := os.ReadDir(folder)
|
||||
if err != nil {
|
||||
srv.log.Error("Problem reading dir", "folder", folder, "error", err)
|
||||
logger.Error("Problem reading dir", "folder", folder, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -126,7 +166,7 @@ func (srv *CleanUpService) cleanUpTmpFolder(folder string) {
|
|||
for _, file := range files {
|
||||
info, err := file.Info()
|
||||
if err != nil {
|
||||
srv.log.Error("Problem reading file", "folder", folder, "file", file, "error", err)
|
||||
logger.Error("Problem reading file", "folder", folder, "file", file, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -139,11 +179,11 @@ func (srv *CleanUpService) cleanUpTmpFolder(folder string) {
|
|||
fullPath := path.Join(folder, file.Name())
|
||||
err := os.Remove(fullPath)
|
||||
if err != nil {
|
||||
srv.log.Error("Failed to delete temp file", "file", file.Name(), "error", err)
|
||||
logger.Error("Failed to delete temp file", "file", file.Name(), "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
srv.log.Debug("Found old rendered file to delete", "folder", folder, "deleted", len(toDelete), "kept", len(files))
|
||||
logger.Debug("Found old rendered file to delete", "folder", folder, "deleted", len(toDelete), "kept", len(files))
|
||||
}
|
||||
|
||||
func (srv *CleanUpService) shouldCleanupTempFile(filemtime time.Time, now time.Time) bool {
|
||||
|
|
@ -155,35 +195,50 @@ func (srv *CleanUpService) shouldCleanupTempFile(filemtime time.Time, now time.T
|
|||
}
|
||||
|
||||
func (srv *CleanUpService) deleteExpiredSnapshots(ctx context.Context) {
|
||||
logger := srv.log.FromContext(ctx)
|
||||
cmd := dashboardsnapshots.DeleteExpiredSnapshotsCommand{}
|
||||
if err := srv.dashboardSnapshotService.DeleteExpiredSnapshots(ctx, &cmd); err != nil {
|
||||
srv.log.Error("Failed to delete expired snapshots", "error", err.Error())
|
||||
logger.Error("Failed to delete expired snapshots", "error", err.Error())
|
||||
} else {
|
||||
srv.log.Debug("Deleted expired snapshots", "rows affected", cmd.DeletedRows)
|
||||
logger.Debug("Deleted expired snapshots", "rows affected", cmd.DeletedRows)
|
||||
}
|
||||
}
|
||||
|
||||
func (srv *CleanUpService) deleteExpiredDashboardVersions(ctx context.Context) {
|
||||
logger := srv.log.FromContext(ctx)
|
||||
cmd := dashver.DeleteExpiredVersionsCommand{}
|
||||
if err := srv.dashboardVersionService.DeleteExpired(ctx, &cmd); err != nil {
|
||||
srv.log.Error("Failed to delete expired dashboard versions", "error", err.Error())
|
||||
logger.Error("Failed to delete expired dashboard versions", "error", err.Error())
|
||||
} else {
|
||||
srv.log.Debug("Deleted old/expired dashboard versions", "rows affected", cmd.DeletedRows)
|
||||
logger.Debug("Deleted old/expired dashboard versions", "rows affected", cmd.DeletedRows)
|
||||
}
|
||||
}
|
||||
|
||||
func (srv *CleanUpService) deleteExpiredImages(ctx context.Context) {
|
||||
logger := srv.log.FromContext(ctx)
|
||||
if !srv.Cfg.UnifiedAlerting.IsEnabled() {
|
||||
return
|
||||
}
|
||||
if rowsAffected, err := srv.deleteExpiredImageService.DeleteExpired(ctx); err != nil {
|
||||
srv.log.Error("Failed to delete expired images", "error", err.Error())
|
||||
logger.Error("Failed to delete expired images", "error", err.Error())
|
||||
} else {
|
||||
srv.log.Debug("Deleted expired images", "rows affected", rowsAffected)
|
||||
logger.Debug("Deleted expired images", "rows affected", rowsAffected)
|
||||
}
|
||||
}
|
||||
|
||||
func (srv *CleanUpService) deleteOldLoginAttempts(ctx context.Context) {
|
||||
logger := srv.log.FromContext(ctx)
|
||||
err := srv.ServerLockService.LockAndExecute(ctx, "delete old login attempts",
|
||||
time.Minute*10, func(context.Context) {
|
||||
srv.deleteOldLoginAttemptsWithoutLock(ctx)
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("failed to lock and execute cleanup of old login attempts", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (srv *CleanUpService) deleteOldLoginAttemptsWithoutLock(ctx context.Context) {
|
||||
logger := srv.log.FromContext(ctx)
|
||||
if srv.Cfg.DisableBruteForceLoginProtection {
|
||||
return
|
||||
}
|
||||
|
|
@ -192,62 +247,66 @@ func (srv *CleanUpService) deleteOldLoginAttempts(ctx context.Context) {
|
|||
OlderThan: time.Now().Add(time.Minute * -10),
|
||||
}
|
||||
if err := srv.loginAttemptService.DeleteOldLoginAttempts(ctx, &cmd); err != nil {
|
||||
srv.log.Error("Problem deleting expired login attempts", "error", err.Error())
|
||||
logger.Error("Problem deleting expired login attempts", "error", err.Error())
|
||||
} else {
|
||||
srv.log.Debug("Deleted expired login attempts", "rows affected", cmd.DeletedRows)
|
||||
logger.Debug("Deleted expired login attempts", "rows affected", cmd.DeletedRows)
|
||||
}
|
||||
}
|
||||
|
||||
func (srv *CleanUpService) expireOldUserInvites(ctx context.Context) {
|
||||
logger := srv.log.FromContext(ctx)
|
||||
maxInviteLifetime := srv.Cfg.UserInviteMaxLifetime
|
||||
|
||||
cmd := models.ExpireTempUsersCommand{
|
||||
OlderThan: time.Now().Add(-maxInviteLifetime),
|
||||
}
|
||||
|
||||
if err := srv.tempUserService.ExpireOldUserInvites(ctx, &cmd); err != nil {
|
||||
srv.log.Error("Problem expiring user invites", "error", err.Error())
|
||||
logger.Error("Problem expiring user invites", "error", err.Error())
|
||||
} else {
|
||||
srv.log.Debug("Expired user invites", "rows affected", cmd.NumExpired)
|
||||
logger.Debug("Expired user invites", "rows affected", cmd.NumExpired)
|
||||
}
|
||||
}
|
||||
|
||||
func (srv *CleanUpService) deleteStaleShortURLs(ctx context.Context) {
|
||||
logger := srv.log.FromContext(ctx)
|
||||
cmd := models.DeleteShortUrlCommand{
|
||||
OlderThan: time.Now().Add(-time.Hour * 24 * 7),
|
||||
}
|
||||
if err := srv.ShortURLService.DeleteStaleShortURLs(ctx, &cmd); err != nil {
|
||||
srv.log.Error("Problem deleting stale short urls", "error", err.Error())
|
||||
logger.Error("Problem deleting stale short urls", "error", err.Error())
|
||||
} else {
|
||||
srv.log.Debug("Deleted short urls", "rows affected", cmd.NumDeleted)
|
||||
logger.Debug("Deleted short urls", "rows affected", cmd.NumDeleted)
|
||||
}
|
||||
}
|
||||
|
||||
func (srv *CleanUpService) deleteStaleQueryHistory(ctx context.Context) {
|
||||
logger := srv.log.FromContext(ctx)
|
||||
// Delete query history from 14+ days ago with exception of starred queries
|
||||
maxQueryHistoryLifetime := time.Hour * 24 * 14
|
||||
olderThan := time.Now().Add(-maxQueryHistoryLifetime).Unix()
|
||||
rowsCount, err := srv.QueryHistoryService.DeleteStaleQueriesInQueryHistory(ctx, olderThan)
|
||||
if err != nil {
|
||||
srv.log.Error("Problem deleting stale query history", "error", err.Error())
|
||||
logger.Error("Problem deleting stale query history", "error", err.Error())
|
||||
} else {
|
||||
srv.log.Debug("Deleted stale query history", "rows affected", rowsCount)
|
||||
logger.Debug("Deleted stale query history", "rows affected", rowsCount)
|
||||
}
|
||||
|
||||
// Enforce 200k limit for query_history table
|
||||
queryHistoryLimit := 200000
|
||||
rowsCount, err = srv.QueryHistoryService.EnforceRowLimitInQueryHistory(ctx, queryHistoryLimit, false)
|
||||
if err != nil {
|
||||
srv.log.Error("Problem with enforcing row limit for query_history", "error", err.Error())
|
||||
logger.Error("Problem with enforcing row limit for query_history", "error", err.Error())
|
||||
} else {
|
||||
srv.log.Debug("Enforced row limit for query_history", "rows affected", rowsCount)
|
||||
logger.Debug("Enforced row limit for query_history", "rows affected", rowsCount)
|
||||
}
|
||||
|
||||
// Enforce 150k limit for query_history_star table
|
||||
queryHistoryStarLimit := 150000
|
||||
rowsCount, err = srv.QueryHistoryService.EnforceRowLimitInQueryHistory(ctx, queryHistoryStarLimit, true)
|
||||
if err != nil {
|
||||
srv.log.Error("Problem with enforcing row limit for query_history_star", "error", err.Error())
|
||||
logger.Error("Problem with enforcing row limit for query_history_star", "error", err.Error())
|
||||
} else {
|
||||
srv.log.Debug("Enforced row limit for query_history_star", "rows affected", rowsCount)
|
||||
logger.Debug("Enforced row limit for query_history_star", "rows affected", rowsCount)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue