diff --git a/pkg/operators/provisioning/jobs_operator.go b/pkg/operators/provisioning/jobs_operator.go index 434ea3f7635..7877e98f425 100644 --- a/pkg/operators/provisioning/jobs_operator.go +++ b/pkg/operators/provisioning/jobs_operator.go @@ -15,6 +15,11 @@ import ( "github.com/grafana/grafana/pkg/apimachinery/identity" "github.com/grafana/grafana/pkg/apimachinery/utils" + "github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs" + "github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/export" + "github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/migrate" + "github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/move" + "github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/sync" "github.com/grafana/grafana/pkg/registry/apis/provisioning/resources" "github.com/grafana/grafana/pkg/services/apiserver/standalone" "github.com/grafana/grafana/pkg/setting" @@ -23,6 +28,8 @@ import ( "github.com/grafana/grafana/apps/provisioning/pkg/controller" informer "github.com/grafana/grafana/apps/provisioning/pkg/generated/informers/externalversions" + "github.com/grafana/grafana/apps/provisioning/pkg/repository" + deletepkg "github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/delete" ) func RunJobController(opts standalone.BuildInfo, c *cli.Context, cfg *setting.Cfg) error { @@ -31,7 +38,7 @@ func RunJobController(opts standalone.BuildInfo, c *cli.Context, cfg *setting.Cf })).With("logger", "provisioning-job-controller") logger.Info("Starting provisioning job controller") - controllerCfg, err := getJobsControllerConfig(cfg) + controllerCfg, err := setupJobsControllerFromConfig(cfg) if err != nil { return fmt.Errorf("failed to setup operator: %w", err) } @@ -64,17 +71,6 @@ func RunJobController(opts standalone.BuildInfo, c *cli.Context, cfg *setting.Cf } logger.Info("jobs controller started") - notifications := jobController.InsertNotifications() - go func() { - for { - select { - case <-ctx.Done(): - return - case <-notifications: - logger.Info("job create notification received") - } - } - }() var startHistoryInformers func() if controllerCfg.historyExpiration > 0 { @@ -97,6 +93,55 @@ func RunJobController(opts standalone.BuildInfo, c *cli.Context, cfg *setting.Cf } else { startHistoryInformers = func() {} } + // HistoryWriter can be either Loki or the API server + // TODO: Loki configuration and setup in the same way we do for the API server + // https://github.com/grafana/git-ui-sync-project/issues/508 + // var jobHistoryWriter jobs.HistoryWriter + // if b.jobHistoryLoki != nil { + // jobHistoryWriter = b.jobHistoryLoki + // } else { + // jobHistoryWriter = jobs.NewAPIClientHistoryWriter(provisioningClient.ProvisioningV0alpha1()) + // } + + jobHistoryWriter := jobs.NewAPIClientHistoryWriter(controllerCfg.provisioningClient.ProvisioningV0alpha1()) + jobStore, err := jobs.NewJobStore(controllerCfg.provisioningClient.ProvisioningV0alpha1(), 30*time.Second) + if err != nil { + return fmt.Errorf("create API client job store: %w", err) + } + + workers, err := setupWorkers(controllerCfg) + if err != nil { + return fmt.Errorf("setup workers: %w", err) + } + + repoGetter := resources.NewRepositoryGetter( + controllerCfg.repoFactory, + controllerCfg.provisioningClient.ProvisioningV0alpha1(), + ) + + // This is basically our own JobQueue system + driver, err := jobs.NewConcurrentJobDriver( + 3, // 3 drivers for now + 20*time.Minute, // Max time for each job + time.Minute, // Cleanup jobs + 30*time.Second, // Periodically look for new jobs + 30*time.Second, // Lease renewal interval + jobStore, + repoGetter, + jobHistoryWriter, + jobController.InsertNotifications(), + workers..., + ) + if err != nil { + return fmt.Errorf("create concurrent job driver: %w", err) + } + + go func() { + logger.Info("jobs controller started") + if err := driver.Run(ctx); err != nil { + logger.Error("job driver failed", "error", err) + } + }() // Start informers go jobInformerFactory.Start(ctx.Done()) @@ -116,18 +161,70 @@ type jobsControllerConfig struct { historyExpiration time.Duration } -func getJobsControllerConfig(cfg *setting.Cfg) (*jobsControllerConfig, error) { +func setupJobsControllerFromConfig(cfg *setting.Cfg) (*jobsControllerConfig, error) { controllerCfg, err := setupFromConfig(cfg) if err != nil { return nil, err } + return &jobsControllerConfig{ provisioningControllerConfig: *controllerCfg, historyExpiration: cfg.SectionWithEnvOverrides("operator").Key("history_expiration").MustDuration(0), }, nil } -// Use unified storage and API clients for testing purposes. +func setupWorkers(controllerCfg *jobsControllerConfig) ([]jobs.Worker, error) { + clients := controllerCfg.clients + parsers := resources.NewParserFactory(clients) + resourceLister := resources.NewResourceLister(controllerCfg.unified) + repositoryResources := resources.NewRepositoryResourcesFactory(parsers, clients, resourceLister) + statusPatcher := controller.NewRepositoryStatusPatcher(controllerCfg.provisioningClient.ProvisioningV0alpha1()) + + workers := make([]jobs.Worker, 0) + + // Sync + syncer := sync.NewSyncer(sync.Compare, sync.FullSync, sync.IncrementalSync) + syncWorker := sync.NewSyncWorker( + clients, + repositoryResources, + nil, // HACK: we have updated the worker to check for nil + statusPatcher.Patch, + syncer, + ) + workers = append(workers, syncWorker) + + // Export + stageIfPossible := repository.WrapWithStageAndPushIfPossible + exportWorker := export.NewExportWorker( + clients, + repositoryResources, + export.ExportAll, + stageIfPossible, + ) + workers = append(workers, exportWorker) + + // Migrate + cleaner := migrate.NewNamespaceCleaner(clients) + unifiedStorageMigrator := migrate.NewUnifiedStorageMigrator( + cleaner, + exportWorker, + syncWorker, + ) + migrationWorker := migrate.NewMigrationWorkerFromUnified(unifiedStorageMigrator) + workers = append(workers, migrationWorker) + + // Delete + deleteWorker := deletepkg.NewWorker(syncWorker, stageIfPossible, repositoryResources) + workers = append(workers, deleteWorker) + + // Move + moveWorker := move.NewWorker(syncWorker, stageIfPossible, repositoryResources) + workers = append(workers, moveWorker) + + return workers, nil +} + +// Use unified storage client for testing purposes. // TODO: remove this once the processing logic is in place // https://github.com/grafana/git-ui-sync-project/issues/467 func temporaryPeriodicTestClients(ctx context.Context, logger logging.Logger, controllerCfg *jobsControllerConfig) {