grafana/pkg/operators/provisioning/jobs_operator.go

244 lines
8.3 KiB
Go

package provisioning
import (
"context"
"fmt"
"log/slog"
"os"
"os/signal"
"syscall"
"time"
"github.com/grafana/grafana-app-sdk/logging"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/client-go/tools/cache"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/export"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/migrate"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/move"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/sync"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/resources"
"github.com/grafana/grafana/pkg/server"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/apps/provisioning/pkg/controller"
informer "github.com/grafana/grafana/apps/provisioning/pkg/generated/informers/externalversions"
"github.com/grafana/grafana/apps/provisioning/pkg/repository"
deletepkg "github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/delete"
)
func RunJobController(deps server.OperatorDependencies) error {
logger := logging.NewSLogLogger(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug,
})).With("logger", "provisioning-job-controller")
logger.Info("Starting provisioning job controller")
tracingConfig, err := tracing.ProvideTracingConfig(deps.Config)
if err != nil {
return fmt.Errorf("failed to provide tracing config: %w", err)
}
tracer, err := tracing.ProvideService(tracingConfig)
if err != nil {
return fmt.Errorf("failed to provide tracing service: %w", err)
}
controllerCfg, err := setupJobsControllerFromConfig(deps.Config, deps.Registerer)
if err != nil {
return fmt.Errorf("failed to setup operator: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
fmt.Println("Received shutdown signal, stopping controllers")
cancel()
}()
// Jobs informer and controller (resync ~60s like in register.go)
jobInformerFactory := informer.NewSharedInformerFactoryWithOptions(
controllerCfg.provisioningClient,
controllerCfg.resyncInterval,
)
jobInformer := jobInformerFactory.Provisioning().V0alpha1().Jobs()
jobController, err := controller.NewJobController(jobInformer)
if err != nil {
return fmt.Errorf("failed to create job controller: %w", err)
}
logger.Info("jobs controller started")
var startHistoryInformers func()
if controllerCfg.historyExpiration > 0 {
// History jobs informer and controller (separate factory with resync == expiration)
historyInformerFactory := informer.NewSharedInformerFactoryWithOptions(
controllerCfg.provisioningClient,
controllerCfg.historyExpiration,
)
historyJobInformer := historyInformerFactory.Provisioning().V0alpha1().HistoricJobs()
_, err = controller.NewHistoryJobController(
controllerCfg.provisioningClient.ProvisioningV0alpha1(),
historyJobInformer,
controllerCfg.historyExpiration,
)
if err != nil {
return fmt.Errorf("failed to create history job controller: %w", err)
}
logger.Info("history cleanup enabled", "expiration", controllerCfg.historyExpiration.String())
startHistoryInformers = func() { historyInformerFactory.Start(ctx.Done()) }
} else {
startHistoryInformers = func() {}
}
// HistoryWriter can be either Loki or the API server
// TODO: Loki configuration and setup in the same way we do for the API server
// https://github.com/grafana/git-ui-sync-project/issues/508
// var jobHistoryWriter jobs.HistoryWriter
// if b.jobHistoryLoki != nil {
// jobHistoryWriter = b.jobHistoryLoki
// } else {
// jobHistoryWriter = jobs.NewAPIClientHistoryWriter(provisioningClient.ProvisioningV0alpha1())
// }
jobHistoryWriter := jobs.NewAPIClientHistoryWriter(controllerCfg.provisioningClient.ProvisioningV0alpha1())
jobStore, err := jobs.NewJobStore(controllerCfg.provisioningClient.ProvisioningV0alpha1(), 30*time.Second, deps.Registerer)
if err != nil {
return fmt.Errorf("create API client job store: %w", err)
}
workers, err := setupWorkers(controllerCfg, deps.Registerer, tracer)
if err != nil {
return fmt.Errorf("setup workers: %w", err)
}
repoGetter := resources.NewRepositoryGetter(
controllerCfg.repoFactory,
controllerCfg.provisioningClient.ProvisioningV0alpha1(),
)
// This is basically our own JobQueue system
driver, err := jobs.NewConcurrentJobDriver(
controllerCfg.concurrentDrivers,
controllerCfg.maxJobTimeout,
controllerCfg.cleanupInterval,
controllerCfg.jobInterval,
controllerCfg.leaseRenewalInterval,
jobStore,
repoGetter,
jobHistoryWriter,
jobController.InsertNotifications(),
deps.Registerer,
workers...,
)
if err != nil {
return fmt.Errorf("create concurrent job driver: %w", err)
}
go func() {
logger.Info("jobs controller started")
if err := driver.Run(ctx); err != nil {
logger.Error("job driver failed", "error", err)
}
}()
// Start informers
go jobInformerFactory.Start(ctx.Done())
go startHistoryInformers()
// Optionally wait for job cache sync; history cleanup can rely on resync events
if !cache.WaitForCacheSync(ctx.Done(), jobInformer.Informer().HasSynced) {
return fmt.Errorf("failed to sync job informer cache")
}
<-ctx.Done()
return nil
}
type jobsControllerConfig struct {
provisioningControllerConfig
historyExpiration time.Duration
maxJobTimeout time.Duration
cleanupInterval time.Duration
jobInterval time.Duration
leaseRenewalInterval time.Duration
concurrentDrivers int
}
func setupJobsControllerFromConfig(cfg *setting.Cfg, registry prometheus.Registerer) (*jobsControllerConfig, error) {
controllerCfg, err := setupFromConfig(cfg, registry)
if err != nil {
return nil, err
}
return &jobsControllerConfig{
provisioningControllerConfig: *controllerCfg,
historyExpiration: cfg.SectionWithEnvOverrides("operator").Key("history_expiration").MustDuration(0),
concurrentDrivers: cfg.SectionWithEnvOverrides("operator").Key("concurrent_drivers").MustInt(3),
maxJobTimeout: cfg.SectionWithEnvOverrides("operator").Key("max_job_timeout").MustDuration(20 * time.Minute),
cleanupInterval: cfg.SectionWithEnvOverrides("operator").Key("cleanup_interval").MustDuration(time.Minute),
jobInterval: cfg.SectionWithEnvOverrides("operator").Key("job_interval").MustDuration(30 * time.Second),
leaseRenewalInterval: cfg.SectionWithEnvOverrides("operator").Key("lease_renewal_interval").MustDuration(30 * time.Second),
}, nil
}
func setupWorkers(controllerCfg *jobsControllerConfig, registry prometheus.Registerer, tracer tracing.Tracer) ([]jobs.Worker, error) {
clients := controllerCfg.clients
parsers := resources.NewParserFactory(clients)
resourceLister := resources.NewResourceLister(controllerCfg.unified)
repositoryResources := resources.NewRepositoryResourcesFactory(parsers, clients, resourceLister)
statusPatcher := controller.NewRepositoryStatusPatcher(controllerCfg.provisioningClient.ProvisioningV0alpha1())
workers := make([]jobs.Worker, 0)
metrics := jobs.RegisterJobMetrics(registry)
// Sync
syncer := sync.NewSyncer(sync.Compare, sync.FullSync, sync.IncrementalSync, tracer)
syncWorker := sync.NewSyncWorker(
clients,
repositoryResources,
nil, // HACK: we have updated the worker to check for nil
statusPatcher.Patch,
syncer,
metrics,
tracer,
)
workers = append(workers, syncWorker)
// Export
stageIfPossible := repository.WrapWithStageAndPushIfPossible
exportWorker := export.NewExportWorker(
clients,
repositoryResources,
export.ExportAll,
stageIfPossible,
metrics,
)
workers = append(workers, exportWorker)
// Migrate
cleaner := migrate.NewNamespaceCleaner(clients)
unifiedStorageMigrator := migrate.NewUnifiedStorageMigrator(
cleaner,
exportWorker,
syncWorker,
)
migrationWorker := migrate.NewMigrationWorkerFromUnified(unifiedStorageMigrator)
workers = append(workers, migrationWorker)
// Delete
deleteWorker := deletepkg.NewWorker(syncWorker, stageIfPossible, repositoryResources, metrics)
workers = append(workers, deleteWorker)
// Move
moveWorker := move.NewWorker(syncWorker, stageIfPossible, repositoryResources, metrics)
workers = append(workers, moveWorker)
return workers, nil
}