diff --git a/pkg/registry/apis/provisioning/jobs.go b/pkg/registry/apis/provisioning/jobs.go index 8c352bbeee1..a1fc5d7648b 100644 --- a/pkg/registry/apis/provisioning/jobs.go +++ b/pkg/registry/apis/provisioning/jobs.go @@ -18,7 +18,7 @@ import ( type jobsConnector struct { repoGetter RepoGetter jobs jobs.Queue - historic jobs.History + historic jobs.HistoryReader } func (*jobsConnector) New() runtime.Object { diff --git a/pkg/registry/apis/provisioning/jobs/concurrent_driver.go b/pkg/registry/apis/provisioning/jobs/concurrent_driver.go index 4f508c25a8d..0504fc414d5 100644 --- a/pkg/registry/apis/provisioning/jobs/concurrent_driver.go +++ b/pkg/registry/apis/provisioning/jobs/concurrent_driver.go @@ -18,7 +18,7 @@ type ConcurrentJobDriver struct { leaseRenewalInterval time.Duration store Store repoGetter RepoGetter - historicJobs History + historicJobs HistoryWriter workers []Worker notifications chan struct{} } @@ -29,7 +29,7 @@ func NewConcurrentJobDriver( jobTimeout, cleanupInterval, jobInterval, leaseRenewalInterval time.Duration, store Store, repoGetter RepoGetter, - historicJobs History, + historicJobs HistoryWriter, notifications chan struct{}, workers ...Worker, ) (*ConcurrentJobDriver, error) { diff --git a/pkg/registry/apis/provisioning/jobs/driver.go b/pkg/registry/apis/provisioning/jobs/driver.go index f64eebe3528..cd186f3151f 100644 --- a/pkg/registry/apis/provisioning/jobs/driver.go +++ b/pkg/registry/apis/provisioning/jobs/driver.go @@ -68,7 +68,7 @@ type jobDriver struct { repoGetter RepoGetter // save info about finished jobs - historicJobs History + historicJobs HistoryWriter // Workers process the job. // Only the first worker who supports the job will process it; the rest are ignored. @@ -82,7 +82,7 @@ func NewJobDriver( jobTimeout, jobInterval, leaseRenewalInterval time.Duration, store Store, repoGetter RepoGetter, - historicJobs History, + historicJobs HistoryWriter, notifications chan struct{}, workers ...Worker, ) (*jobDriver, error) { diff --git a/pkg/registry/apis/provisioning/jobs/history_mock.go b/pkg/registry/apis/provisioning/jobs/history_mock.go deleted file mode 100644 index f6697d33bcf..00000000000 --- a/pkg/registry/apis/provisioning/jobs/history_mock.go +++ /dev/null @@ -1,205 +0,0 @@ -// Code generated by mockery v2.53.4. DO NOT EDIT. - -package jobs - -import ( - context "context" - - v0alpha1 "github.com/grafana/grafana/apps/provisioning/pkg/apis/provisioning/v0alpha1" - mock "github.com/stretchr/testify/mock" -) - -// MockHistory is an autogenerated mock type for the History type -type MockHistory struct { - mock.Mock -} - -type MockHistory_Expecter struct { - mock *mock.Mock -} - -func (_m *MockHistory) EXPECT() *MockHistory_Expecter { - return &MockHistory_Expecter{mock: &_m.Mock} -} - -// GetJob provides a mock function with given fields: ctx, namespace, repo, uid -func (_m *MockHistory) GetJob(ctx context.Context, namespace string, repo string, uid string) (*v0alpha1.Job, error) { - ret := _m.Called(ctx, namespace, repo, uid) - - if len(ret) == 0 { - panic("no return value specified for GetJob") - } - - var r0 *v0alpha1.Job - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string, string, string) (*v0alpha1.Job, error)); ok { - return rf(ctx, namespace, repo, uid) - } - if rf, ok := ret.Get(0).(func(context.Context, string, string, string) *v0alpha1.Job); ok { - r0 = rf(ctx, namespace, repo, uid) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*v0alpha1.Job) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, string, string, string) error); ok { - r1 = rf(ctx, namespace, repo, uid) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MockHistory_GetJob_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetJob' -type MockHistory_GetJob_Call struct { - *mock.Call -} - -// GetJob is a helper method to define mock.On call -// - ctx context.Context -// - namespace string -// - repo string -// - uid string -func (_e *MockHistory_Expecter) GetJob(ctx interface{}, namespace interface{}, repo interface{}, uid interface{}) *MockHistory_GetJob_Call { - return &MockHistory_GetJob_Call{Call: _e.mock.On("GetJob", ctx, namespace, repo, uid)} -} - -func (_c *MockHistory_GetJob_Call) Run(run func(ctx context.Context, namespace string, repo string, uid string)) *MockHistory_GetJob_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(string)) - }) - return _c -} - -func (_c *MockHistory_GetJob_Call) Return(_a0 *v0alpha1.Job, _a1 error) *MockHistory_GetJob_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockHistory_GetJob_Call) RunAndReturn(run func(context.Context, string, string, string) (*v0alpha1.Job, error)) *MockHistory_GetJob_Call { - _c.Call.Return(run) - return _c -} - -// RecentJobs provides a mock function with given fields: ctx, namespace, repo -func (_m *MockHistory) RecentJobs(ctx context.Context, namespace string, repo string) (*v0alpha1.JobList, error) { - ret := _m.Called(ctx, namespace, repo) - - if len(ret) == 0 { - panic("no return value specified for RecentJobs") - } - - var r0 *v0alpha1.JobList - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string, string) (*v0alpha1.JobList, error)); ok { - return rf(ctx, namespace, repo) - } - if rf, ok := ret.Get(0).(func(context.Context, string, string) *v0alpha1.JobList); ok { - r0 = rf(ctx, namespace, repo) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*v0alpha1.JobList) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { - r1 = rf(ctx, namespace, repo) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MockHistory_RecentJobs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecentJobs' -type MockHistory_RecentJobs_Call struct { - *mock.Call -} - -// RecentJobs is a helper method to define mock.On call -// - ctx context.Context -// - namespace string -// - repo string -func (_e *MockHistory_Expecter) RecentJobs(ctx interface{}, namespace interface{}, repo interface{}) *MockHistory_RecentJobs_Call { - return &MockHistory_RecentJobs_Call{Call: _e.mock.On("RecentJobs", ctx, namespace, repo)} -} - -func (_c *MockHistory_RecentJobs_Call) Run(run func(ctx context.Context, namespace string, repo string)) *MockHistory_RecentJobs_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(string)) - }) - return _c -} - -func (_c *MockHistory_RecentJobs_Call) Return(_a0 *v0alpha1.JobList, _a1 error) *MockHistory_RecentJobs_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockHistory_RecentJobs_Call) RunAndReturn(run func(context.Context, string, string) (*v0alpha1.JobList, error)) *MockHistory_RecentJobs_Call { - _c.Call.Return(run) - return _c -} - -// WriteJob provides a mock function with given fields: ctx, job -func (_m *MockHistory) WriteJob(ctx context.Context, job *v0alpha1.Job) error { - ret := _m.Called(ctx, job) - - if len(ret) == 0 { - panic("no return value specified for WriteJob") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *v0alpha1.Job) error); ok { - r0 = rf(ctx, job) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// MockHistory_WriteJob_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WriteJob' -type MockHistory_WriteJob_Call struct { - *mock.Call -} - -// WriteJob is a helper method to define mock.On call -// - ctx context.Context -// - job *v0alpha1.Job -func (_e *MockHistory_Expecter) WriteJob(ctx interface{}, job interface{}) *MockHistory_WriteJob_Call { - return &MockHistory_WriteJob_Call{Call: _e.mock.On("WriteJob", ctx, job)} -} - -func (_c *MockHistory_WriteJob_Call) Run(run func(ctx context.Context, job *v0alpha1.Job)) *MockHistory_WriteJob_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*v0alpha1.Job)) - }) - return _c -} - -func (_c *MockHistory_WriteJob_Call) Return(_a0 error) *MockHistory_WriteJob_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockHistory_WriteJob_Call) RunAndReturn(run func(context.Context, *v0alpha1.Job) error) *MockHistory_WriteJob_Call { - _c.Call.Return(run) - return _c -} - -// NewMockHistory creates a new instance of MockHistory. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewMockHistory(t interface { - mock.TestingT - Cleanup(func()) -}) *MockHistory { - mock := &MockHistory{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/pkg/registry/apis/provisioning/jobs/history.go b/pkg/registry/apis/provisioning/jobs/history_reader.go similarity index 63% rename from pkg/registry/apis/provisioning/jobs/history.go rename to pkg/registry/apis/provisioning/jobs/history_reader.go index 95fc4b73b3d..ac54ffe8c69 100644 --- a/pkg/registry/apis/provisioning/jobs/history.go +++ b/pkg/registry/apis/provisioning/jobs/history_reader.go @@ -6,7 +6,6 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/internalversion" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" @@ -14,13 +13,10 @@ import ( provisioning "github.com/grafana/grafana/apps/provisioning/pkg/apis/provisioning/v0alpha1" ) -// History keeps track of completed jobs +// HistoryReader keeps track of completed jobs // -//go:generate mockery --name History --structname MockHistory --inpackage --filename history_mock.go --with-expecter -type History interface { - // Adds a job to the history - WriteJob(ctx context.Context, job *provisioning.Job) error - +//go:generate mockery --name HistoryReader --structname MockHistoryReader --inpackage --filename history_reader_mock.go --with-expecter +type HistoryReader interface { // Gets recent jobs for a repository RecentJobs(ctx context.Context, namespace, repo string) (*provisioning.JobList, error) @@ -30,14 +26,9 @@ type History interface { // NewStorageBackedHistory creates a History client backed by unified storage // This should be replaced by loki when running in cloud -func NewStorageBackedHistory(store rest.Storage) (History, error) { +func NewStorageBackedHistory(store rest.Storage) (HistoryReader, error) { var ok bool history := &storageBackedHistory{} - history.creator, ok = store.(rest.Creater) - if !ok { - return nil, fmt.Errorf("storage does not implement rest.Creater") - } - history.lister, ok = store.(rest.Lister) if !ok { return nil, fmt.Errorf("storage does not implement rest.Lister") @@ -47,35 +38,7 @@ func NewStorageBackedHistory(store rest.Storage) (History, error) { } type storageBackedHistory struct { - creator rest.Creater - lister rest.Lister -} - -// Write implements History. -func (s *storageBackedHistory) WriteJob(ctx context.Context, job *provisioning.Job) error { - if job.UID == "" { - return fmt.Errorf("missing UID in job '%s'", job.GetName()) - } - if job.Labels == nil { - job.Labels = make(map[string]string) - } - job.Labels[LabelRepository] = job.Spec.Repository - job.Labels[LabelJobOriginalUID] = string(job.UID) - - // Generate a new name based on the input job - job.GenerateName = job.Name + "-" - job.Name = "" - // We also reset the UID as this is not the same object. - job.UID = "" - // We aren't allowed to write with ResourceVersion set. - job.ResourceVersion = "" - - _, err := s.creator.Create(ctx, &provisioning.HistoricJob{ - ObjectMeta: job.ObjectMeta, - Spec: job.Spec, - Status: job.Status, - }, nil, &metav1.CreateOptions{}) - return err + lister rest.Lister } func (s *storageBackedHistory) getJobs(ctx context.Context, namespace string, labels labels.Set) (*provisioning.JobList, error) { diff --git a/pkg/registry/apis/provisioning/jobs/history_writer.go b/pkg/registry/apis/provisioning/jobs/history_writer.go new file mode 100644 index 00000000000..67a9b427006 --- /dev/null +++ b/pkg/registry/apis/provisioning/jobs/history_writer.go @@ -0,0 +1,68 @@ +package jobs + +import ( + "context" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + provisioning "github.com/grafana/grafana/apps/provisioning/pkg/apis/provisioning/v0alpha1" + client "github.com/grafana/grafana/apps/provisioning/pkg/generated/clientset/versioned/typed/provisioning/v0alpha1" +) + +// HistoryWriter stores completed jobs +// +//go:generate mockery --name=HistoryWriter --structname=MockHistoryWriter --inpackage --filename history_writer_mock.go --with-expecter +type HistoryWriter interface { + // Adds a job to the history + WriteJob(ctx context.Context, job *provisioning.Job) error +} + +type apiClientHistoryWriter struct { + client client.ProvisioningV0alpha1Interface +} + +// NewAPIClientHistoryWriter creates a HistoryWriter backed by the provisioning API client. +func NewAPIClientHistoryWriter(provisioningClient client.ProvisioningV0alpha1Interface) HistoryWriter { + return &apiClientHistoryWriter{ + client: provisioningClient, + } +} + +// WriteJob implements HistoryWriter. +func (w *apiClientHistoryWriter) WriteJob(ctx context.Context, job *provisioning.Job) error { + if job.UID == "" { + return fmt.Errorf("missing UID in job '%s'", job.GetName()) + } + + // Create a copy of the job's metadata to avoid modifying the original + meta := job.ObjectMeta.DeepCopy() + + // Ensure labels map exists + if meta.Labels == nil { + meta.Labels = make(map[string]string) + } + + // Add required labels for history tracking + meta.Labels[LabelRepository] = job.Spec.Repository + meta.Labels[LabelJobOriginalUID] = string(job.UID) + + // Generate a new name based on the input job + meta.GenerateName = job.Name + "-" + meta.Name = "" + // We also reset the UID as this is not the same object. + meta.UID = "" + // We aren't allowed to write with ResourceVersion set. + meta.ResourceVersion = "" + + // Create the historic job using the API client + historicJob := &provisioning.HistoricJob{ + ObjectMeta: *meta, + Spec: job.Spec, + Status: job.Status, + } + + _, err := w.client.HistoricJobs(job.Namespace).Create(ctx, historicJob, metav1.CreateOptions{}) + + return err +} diff --git a/pkg/registry/apis/provisioning/register.go b/pkg/registry/apis/provisioning/register.go index d4bd0d21825..658f0779d5c 100644 --- a/pkg/registry/apis/provisioning/register.go +++ b/pkg/registry/apis/provisioning/register.go @@ -99,7 +99,7 @@ type APIBuilder struct { jobs.Store } jobHistoryConfig *JobHistoryConfig - jobHistory jobs.History + jobHistoryLoki *jobs.LokiJobHistory resourceLister resources.ResourceLister repositoryLister listers.RepositoryLister legacyMigrator legacy.LegacyMigrator @@ -441,28 +441,27 @@ func (b *APIBuilder) UpdateAPIGroupInfo(apiGroupInfo *genericapiserver.APIGroupI storage := map[string]rest.Storage{} // Create job history based on configuration - // Default to in-memory cache if no config provided - var jobHistory jobs.History + // Default to unified storage if no config provided + var jobHistory jobs.HistoryReader if b.jobHistoryConfig != nil && b.jobHistoryConfig.Loki != nil { - jobHistory = jobs.NewLokiJobHistory(*b.jobHistoryConfig.Loki) + b.jobHistoryLoki = jobs.NewLokiJobHistory(*b.jobHistoryConfig.Loki) + jobHistory = b.jobHistoryLoki } else { historicJobStore, err := grafanaregistry.NewCompleteRegistryStore(opts.Scheme, provisioning.HistoricJobResourceInfo, opts.OptsGetter) if err != nil { - return fmt.Errorf("failed to create historic job storage: %w", err) + return fmt.Errorf("create historic job storage: %w", err) } jobHistory, err = jobs.NewStorageBackedHistory(historicJobStore) if err != nil { - return fmt.Errorf("failed to create historic job wrapper: %w", err) + return fmt.Errorf("create historic job wrapper: %w", err) } - storage[provisioning.HistoricJobResourceInfo.StoragePath()] = historicJobStore } - b.jobHistory = jobHistory b.jobs, err = jobs.NewJobStore(realJobStore, 30*time.Second) // FIXME: this timeout if err != nil { - return fmt.Errorf("failed to create job store: %w", err) + return fmt.Errorf("create job store: %w", err) } // Although we never interact with jobs via the API, we want them to be readable (watchable!) from the API. @@ -485,7 +484,7 @@ func (b *APIBuilder) UpdateAPIGroupInfo(apiGroupInfo *genericapiserver.APIGroupI storage[provisioning.RepositoryResourceInfo.StoragePath("jobs")] = &jobsConnector{ repoGetter: b, jobs: b.jobs, - historic: b.jobHistory, + historic: jobHistory, } // Add any extra storage @@ -507,6 +506,12 @@ func (b *APIBuilder) Mutate(ctx context.Context, a admission.Attributes, o admis return nil // This is normal for sub-resource } + // FIXME: Do nothing for HistoryJobs for now + _, ok := obj.(*provisioning.HistoricJob) + if ok { + return nil + } + r, ok := obj.(*provisioning.Repository) if !ok { return fmt.Errorf("expected repository configuration") @@ -551,6 +556,12 @@ func (b *APIBuilder) Validate(ctx context.Context, a admission.Attributes, o adm return nil } + // FIXME: Do nothing for HistoryJobs for now + _, ok := obj.(*provisioning.HistoricJob) + if ok { + return nil + } + repo, err := b.asRepository(ctx, obj) if err != nil { return err @@ -743,6 +754,13 @@ func (b *APIBuilder) GetPostStartHooks() (map[string]genericapiserver.PostStartH workers = append(workers, extra.GetJobWorkers()...) } + var jobHistoryWriter jobs.HistoryWriter + if b.jobHistoryLoki != nil { + jobHistoryWriter = b.jobHistoryLoki + } else { + jobHistoryWriter = jobs.NewAPIClientHistoryWriter(b.GetClient()) + } + // This is basically our own JobQueue system driver, err := jobs.NewConcurrentJobDriver( 3, // 3 drivers for now @@ -750,7 +768,7 @@ func (b *APIBuilder) GetPostStartHooks() (map[string]genericapiserver.PostStartH time.Minute, // Cleanup jobs 30*time.Second, // Periodically look for new jobs 30*time.Second, // Lease renewal interval - b.jobs, b, b.jobHistory, + b.jobs, b, jobHistoryWriter, jobController.InsertNotifications(), workers..., ) @@ -783,8 +801,8 @@ func (b *APIBuilder) GetPostStartHooks() (map[string]genericapiserver.PostStartH go repoController.Run(postStartHookCtx.Context, repoControllerWorkers) - // If Loki not used, start the controller for history jobs - if b.jobHistoryConfig == nil || b.jobHistoryConfig.Loki == nil { + // If Loki not used, initialize the API client-based history writer and start the controller for history jobs + if b.jobHistoryLoki == nil { // Create HistoryJobController for cleanup of old job history entries // Separate informer factory for HistoryJob cleanup with resync interval historyJobExpiration := 30 * time.Second