Provisioning: processing of jobs in job controller (#110223)

* WIP: Controller

* WIP: more changes

* Use patcher from new location

* Separate import

* Move operators to grafana/grafana

* Tidy go mod

* Remove duplicate TODO

* Wrapper for unified storage

* WIP: build unified storage client

* More attempts

* Revert update workspace

* Improve comment

* Fix linting

* Change signature of repository getter

* Add ticket numbers

* Remove question

* Read config from file for decrypt service

* Config struct for unified torage

* Add local config

* Fix compilation

* Try to configure it

* Fix linting

* Add FIXME comment

* Move reusable logic into controller config

* Remove unused

* More logic to be reused

* Extract workers into separate function

* Clean up unified storage client

* Revert a couple of files

* Remove secrets decrypter from this PR

* Revert enterprise imports

* Clean up unified storage setup logic

* Add TODO

* Revert some changes

* Remove file

* Use the expected clients

---------

Co-authored-by: Stephanie Hingtgen <stephanie.hingtgen@grafana.com>
This commit is contained in:
Roberto Jiménez Sánchez 2025-09-05 20:28:31 +02:00 committed by GitHub
parent 8e8c36203f
commit ed2273b2d2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 111 additions and 14 deletions

View File

@ -15,6 +15,11 @@ import (
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/export"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/migrate"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/move"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/sync"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/resources"
"github.com/grafana/grafana/pkg/services/apiserver/standalone"
"github.com/grafana/grafana/pkg/setting"
@ -23,6 +28,8 @@ import (
"github.com/grafana/grafana/apps/provisioning/pkg/controller"
informer "github.com/grafana/grafana/apps/provisioning/pkg/generated/informers/externalversions"
"github.com/grafana/grafana/apps/provisioning/pkg/repository"
deletepkg "github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/delete"
)
func RunJobController(opts standalone.BuildInfo, c *cli.Context, cfg *setting.Cfg) error {
@ -31,7 +38,7 @@ func RunJobController(opts standalone.BuildInfo, c *cli.Context, cfg *setting.Cf
})).With("logger", "provisioning-job-controller")
logger.Info("Starting provisioning job controller")
controllerCfg, err := getJobsControllerConfig(cfg)
controllerCfg, err := setupJobsControllerFromConfig(cfg)
if err != nil {
return fmt.Errorf("failed to setup operator: %w", err)
}
@ -64,17 +71,6 @@ func RunJobController(opts standalone.BuildInfo, c *cli.Context, cfg *setting.Cf
}
logger.Info("jobs controller started")
notifications := jobController.InsertNotifications()
go func() {
for {
select {
case <-ctx.Done():
return
case <-notifications:
logger.Info("job create notification received")
}
}
}()
var startHistoryInformers func()
if controllerCfg.historyExpiration > 0 {
@ -97,6 +93,55 @@ func RunJobController(opts standalone.BuildInfo, c *cli.Context, cfg *setting.Cf
} else {
startHistoryInformers = func() {}
}
// HistoryWriter can be either Loki or the API server
// TODO: Loki configuration and setup in the same way we do for the API server
// https://github.com/grafana/git-ui-sync-project/issues/508
// var jobHistoryWriter jobs.HistoryWriter
// if b.jobHistoryLoki != nil {
// jobHistoryWriter = b.jobHistoryLoki
// } else {
// jobHistoryWriter = jobs.NewAPIClientHistoryWriter(provisioningClient.ProvisioningV0alpha1())
// }
jobHistoryWriter := jobs.NewAPIClientHistoryWriter(controllerCfg.provisioningClient.ProvisioningV0alpha1())
jobStore, err := jobs.NewJobStore(controllerCfg.provisioningClient.ProvisioningV0alpha1(), 30*time.Second)
if err != nil {
return fmt.Errorf("create API client job store: %w", err)
}
workers, err := setupWorkers(controllerCfg)
if err != nil {
return fmt.Errorf("setup workers: %w", err)
}
repoGetter := resources.NewRepositoryGetter(
controllerCfg.repoFactory,
controllerCfg.provisioningClient.ProvisioningV0alpha1(),
)
// This is basically our own JobQueue system
driver, err := jobs.NewConcurrentJobDriver(
3, // 3 drivers for now
20*time.Minute, // Max time for each job
time.Minute, // Cleanup jobs
30*time.Second, // Periodically look for new jobs
30*time.Second, // Lease renewal interval
jobStore,
repoGetter,
jobHistoryWriter,
jobController.InsertNotifications(),
workers...,
)
if err != nil {
return fmt.Errorf("create concurrent job driver: %w", err)
}
go func() {
logger.Info("jobs controller started")
if err := driver.Run(ctx); err != nil {
logger.Error("job driver failed", "error", err)
}
}()
// Start informers
go jobInformerFactory.Start(ctx.Done())
@ -116,18 +161,70 @@ type jobsControllerConfig struct {
historyExpiration time.Duration
}
func getJobsControllerConfig(cfg *setting.Cfg) (*jobsControllerConfig, error) {
func setupJobsControllerFromConfig(cfg *setting.Cfg) (*jobsControllerConfig, error) {
controllerCfg, err := setupFromConfig(cfg)
if err != nil {
return nil, err
}
return &jobsControllerConfig{
provisioningControllerConfig: *controllerCfg,
historyExpiration: cfg.SectionWithEnvOverrides("operator").Key("history_expiration").MustDuration(0),
}, nil
}
// Use unified storage and API clients for testing purposes.
func setupWorkers(controllerCfg *jobsControllerConfig) ([]jobs.Worker, error) {
clients := controllerCfg.clients
parsers := resources.NewParserFactory(clients)
resourceLister := resources.NewResourceLister(controllerCfg.unified)
repositoryResources := resources.NewRepositoryResourcesFactory(parsers, clients, resourceLister)
statusPatcher := controller.NewRepositoryStatusPatcher(controllerCfg.provisioningClient.ProvisioningV0alpha1())
workers := make([]jobs.Worker, 0)
// Sync
syncer := sync.NewSyncer(sync.Compare, sync.FullSync, sync.IncrementalSync)
syncWorker := sync.NewSyncWorker(
clients,
repositoryResources,
nil, // HACK: we have updated the worker to check for nil
statusPatcher.Patch,
syncer,
)
workers = append(workers, syncWorker)
// Export
stageIfPossible := repository.WrapWithStageAndPushIfPossible
exportWorker := export.NewExportWorker(
clients,
repositoryResources,
export.ExportAll,
stageIfPossible,
)
workers = append(workers, exportWorker)
// Migrate
cleaner := migrate.NewNamespaceCleaner(clients)
unifiedStorageMigrator := migrate.NewUnifiedStorageMigrator(
cleaner,
exportWorker,
syncWorker,
)
migrationWorker := migrate.NewMigrationWorkerFromUnified(unifiedStorageMigrator)
workers = append(workers, migrationWorker)
// Delete
deleteWorker := deletepkg.NewWorker(syncWorker, stageIfPossible, repositoryResources)
workers = append(workers, deleteWorker)
// Move
moveWorker := move.NewWorker(syncWorker, stageIfPossible, repositoryResources)
workers = append(workers, moveWorker)
return workers, nil
}
// Use unified storage client for testing purposes.
// TODO: remove this once the processing logic is in place
// https://github.com/grafana/git-ui-sync-project/issues/467
func temporaryPeriodicTestClients(ctx context.Context, logger logging.Logger, controllerCfg *jobsControllerConfig) {