Provisioning: Use loki for recent jobs (#109249)

This commit is contained in:
Roberto Jiménez Sánchez 2025-08-06 20:09:28 +02:00 committed by GitHub
parent fec9cd550a
commit 0b5fc9a736
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 1537 additions and 8 deletions

View File

@ -0,0 +1,144 @@
// Code generated by mockery v2.52.4. DO NOT EDIT.
package jobs
import (
context "context"
loki "github.com/grafana/grafana/pkg/registry/apis/provisioning/loki"
mock "github.com/stretchr/testify/mock"
)
// MockLokiClient is an autogenerated mock type for the LokiClient type
type MockLokiClient struct {
mock.Mock
}
type MockLokiClient_Expecter struct {
mock *mock.Mock
}
func (_m *MockLokiClient) EXPECT() *MockLokiClient_Expecter {
return &MockLokiClient_Expecter{mock: &_m.Mock}
}
// Push provides a mock function with given fields: _a0, _a1
func (_m *MockLokiClient) Push(_a0 context.Context, _a1 []loki.Stream) error {
ret := _m.Called(_a0, _a1)
if len(ret) == 0 {
panic("no return value specified for Push")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, []loki.Stream) error); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockLokiClient_Push_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Push'
type MockLokiClient_Push_Call struct {
*mock.Call
}
// Push is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 []loki.Stream
func (_e *MockLokiClient_Expecter) Push(_a0 interface{}, _a1 interface{}) *MockLokiClient_Push_Call {
return &MockLokiClient_Push_Call{Call: _e.mock.On("Push", _a0, _a1)}
}
func (_c *MockLokiClient_Push_Call) Run(run func(_a0 context.Context, _a1 []loki.Stream)) *MockLokiClient_Push_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].([]loki.Stream))
})
return _c
}
func (_c *MockLokiClient_Push_Call) Return(_a0 error) *MockLokiClient_Push_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockLokiClient_Push_Call) RunAndReturn(run func(context.Context, []loki.Stream) error) *MockLokiClient_Push_Call {
_c.Call.Return(run)
return _c
}
// RangeQuery provides a mock function with given fields: ctx, logQL, start, end, limit
func (_m *MockLokiClient) RangeQuery(ctx context.Context, logQL string, start int64, end int64, limit int64) (loki.QueryRes, error) {
ret := _m.Called(ctx, logQL, start, end, limit)
if len(ret) == 0 {
panic("no return value specified for RangeQuery")
}
var r0 loki.QueryRes
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string, int64, int64, int64) (loki.QueryRes, error)); ok {
return rf(ctx, logQL, start, end, limit)
}
if rf, ok := ret.Get(0).(func(context.Context, string, int64, int64, int64) loki.QueryRes); ok {
r0 = rf(ctx, logQL, start, end, limit)
} else {
r0 = ret.Get(0).(loki.QueryRes)
}
if rf, ok := ret.Get(1).(func(context.Context, string, int64, int64, int64) error); ok {
r1 = rf(ctx, logQL, start, end, limit)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockLokiClient_RangeQuery_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RangeQuery'
type MockLokiClient_RangeQuery_Call struct {
*mock.Call
}
// RangeQuery is a helper method to define mock.On call
// - ctx context.Context
// - logQL string
// - start int64
// - end int64
// - limit int64
func (_e *MockLokiClient_Expecter) RangeQuery(ctx interface{}, logQL interface{}, start interface{}, end interface{}, limit interface{}) *MockLokiClient_RangeQuery_Call {
return &MockLokiClient_RangeQuery_Call{Call: _e.mock.On("RangeQuery", ctx, logQL, start, end, limit)}
}
func (_c *MockLokiClient_RangeQuery_Call) Run(run func(ctx context.Context, logQL string, start int64, end int64, limit int64)) *MockLokiClient_RangeQuery_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].(int64), args[3].(int64), args[4].(int64))
})
return _c
}
func (_c *MockLokiClient_RangeQuery_Call) Return(_a0 loki.QueryRes, _a1 error) *MockLokiClient_RangeQuery_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockLokiClient_RangeQuery_Call) RunAndReturn(run func(context.Context, string, int64, int64, int64) (loki.QueryRes, error)) *MockLokiClient_RangeQuery_Call {
_c.Call.Return(run)
return _c
}
// NewMockLokiClient creates a new instance of MockLokiClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockLokiClient(t interface {
mock.TestingT
Cleanup(func())
}) *MockLokiClient {
mock := &MockLokiClient{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -0,0 +1,224 @@
package jobs
import (
"context"
"encoding/json"
"fmt"
"sort"
"time"
"github.com/grafana/grafana-app-sdk/logging"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/loki"
apierrors "k8s.io/apimachinery/pkg/api/errors"
provisioning "github.com/grafana/grafana/apps/provisioning/pkg/apis/provisioning/v0alpha1"
)
const (
// Loki label keys
JobHistoryLabelKey = "from"
JobHistoryLabelValue = "job-history"
NamespaceLabel = "namespace"
RepositoryLabel = "repository"
LokiJobSpanName = "provisioning.job.historian.client"
)
const (
// Default query settings
defaultJobQueryRange = 24 * time.Hour // 1 days
maxJobsLimit = 10 // Maximum jobs to return per repository
)
//go:generate mockery --name LokiClient --structname MockLokiClient --inpackage --filename loki_client_mock.go --with-expecter
type LokiClient interface {
Push(context.Context, []loki.Stream) error
RangeQuery(ctx context.Context, logQL string, start, end, limit int64) (loki.QueryRes, error)
}
// LokiJobHistory implements the History interface using Loki for storage
type LokiJobHistory struct {
client LokiClient
externalLabels map[string]string
}
// NewLokiJobHistory creates a new Loki-based job history implementation
func NewLokiJobHistory(cfg loki.Config) *LokiJobHistory {
return &LokiJobHistory{
client: loki.NewClient(cfg),
externalLabels: cfg.ExternalLabels,
}
}
// WriteJob implements History.WriteJob by storing the job in Loki
func (h *LokiJobHistory) WriteJob(ctx context.Context, job *provisioning.Job) error {
logger := logging.FromContext(ctx)
// Clean up the job copy (remove claim label, similar to in-memory implementation)
jobCopy := job.DeepCopy()
delete(jobCopy.Labels, LabelJobClaim)
// Create Loki stream
stream := h.jobToStream(ctx, jobCopy)
if len(stream.Values) == 0 {
return nil
}
// Push to Loki synchronously
writeCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
logger.Debug("Saving job history to Loki", "namespace", jobCopy.Namespace, "repository", jobCopy.Spec.Repository, "job", jobCopy.Name)
if err := h.client.Push(writeCtx, []loki.Stream{stream}); err != nil {
logger.Error("Failed to save job history to Loki", "error", err)
return fmt.Errorf("failed to save job history: %w", err)
}
logger.Debug("Successfully saved job history to Loki")
return nil
}
// RecentJobs implements History.RecentJobs by querying Loki for recent jobs
func (h *LokiJobHistory) RecentJobs(ctx context.Context, namespace, repo string) (*provisioning.JobList, error) {
logger := logging.FromContext(ctx)
// Build LogQL query
logQL := h.buildJobQuery(namespace, repo)
// Query time range (last 30 days by default)
now := time.Now().UTC()
from := now.Add(-defaultJobQueryRange)
logger.Debug("Querying Loki for recent jobs", "namespace", namespace, "repository", repo, "query", logQL)
// Execute query
result, err := h.client.RangeQuery(ctx, logQL, from.UnixNano(), now.UnixNano(), int64(maxJobsLimit))
if err != nil {
return nil, fmt.Errorf("failed to query job history: %w", err)
}
// Convert result to JobList
jobList, err := h.resultToJobList(result)
if err != nil {
return nil, fmt.Errorf("failed to parse job history results: %w", err)
}
logger.Debug("Retrieved jobs from Loki", "count", len(jobList.Items))
return jobList, nil
}
// GetJob implements History.GetJob by finding a specific job
func (h *LokiJobHistory) GetJob(ctx context.Context, namespace, repo, uid string) (*provisioning.Job, error) {
// Get recent jobs and find the specific one
jobs, err := h.RecentJobs(ctx, namespace, repo)
if err != nil {
return nil, err
}
// Search for job by UID
for _, job := range jobs.Items {
if string(job.UID) == uid {
return &job, nil
}
}
return nil, apierrors.NewNotFound(provisioning.JobResourceInfo.GroupResource(), uid)
}
// jobToStream converts a Job to a Loki stream
func (h *LokiJobHistory) jobToStream(ctx context.Context, job *provisioning.Job) loki.Stream {
logger := logging.FromContext(ctx)
// Create stream labels
labels := make(map[string]string)
// Add external labels
for k, v := range h.externalLabels {
labels[k] = v
}
// Add system labels
labels[JobHistoryLabelKey] = JobHistoryLabelValue
labels[NamespaceLabel] = job.Namespace
labels[RepositoryLabel] = job.Spec.Repository
// Serialize job to JSON
jobJSON, err := json.Marshal(job)
if err != nil {
logger.Error("Failed to marshal job to JSON", "error", err, "job", job.Name)
return loki.Stream{Stream: labels, Values: []loki.Sample{}}
}
// Create timestamp (use finished time if available, otherwise creation time)
timestamp := job.CreationTimestamp.Time
if job.Status.Finished > 0 {
// Status timestamps are in milliseconds
timestamp = time.Unix(0, job.Status.Finished*int64(time.Millisecond))
} else if job.Status.Started > 0 {
// Status timestamps are in milliseconds
timestamp = time.Unix(0, job.Status.Started*int64(time.Millisecond))
}
// Create sample
sample := loki.Sample{
T: timestamp,
V: string(jobJSON),
}
return loki.Stream{
Stream: labels,
Values: []loki.Sample{sample},
}
}
// buildJobQuery creates a LogQL query for jobs
func (h *LokiJobHistory) buildJobQuery(namespace, repo string) string {
return fmt.Sprintf(`{%s=%q,%s=%q,%s=%q}`,
JobHistoryLabelKey, JobHistoryLabelValue,
NamespaceLabel, namespace,
RepositoryLabel, repo,
)
}
// resultToJobList converts Loki query results to a JobList
func (h *LokiJobHistory) resultToJobList(result loki.QueryRes) (*provisioning.JobList, error) {
var jobs []provisioning.Job
// Extract jobs from all streams
for _, stream := range result.Data.Result {
for _, sample := range stream.Values {
var job provisioning.Job
if err := json.Unmarshal([]byte(sample.V), &job); err != nil {
// Unable to log here without context, just continue to next sample
continue
}
jobs = append(jobs, job)
}
}
// Sort jobs by timestamp (most recent first)
sort.Slice(jobs, func(i, j int) bool {
timeI := h.getJobTimestamp(&jobs[i])
timeJ := h.getJobTimestamp(&jobs[j])
return timeI.After(timeJ)
})
// Limit to maxJobs (similar to in-memory implementation: 10 jobs)
if len(jobs) > 10 {
jobs = jobs[:10]
}
return &provisioning.JobList{
Items: jobs,
}, nil
}
// getJobTimestamp returns the most relevant timestamp for sorting
func (h *LokiJobHistory) getJobTimestamp(job *provisioning.Job) time.Time {
if job.Status.Finished > 0 {
return time.Unix(job.Status.Finished, 0)
}
if job.Status.Started > 0 {
return time.Unix(job.Status.Started, 0)
}
return job.CreationTimestamp.Time
}

View File

@ -0,0 +1,602 @@
package jobs
import (
"context"
"encoding/json"
"errors"
"net/url"
"testing"
"time"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/loki"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
provisioning "github.com/grafana/grafana/apps/provisioning/pkg/apis/provisioning/v0alpha1"
)
func TestLokiJobHistory_WriteJob(t *testing.T) {
// Create comprehensive test job with all spec and status fields
job := &provisioning.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job",
Namespace: "test-namespace",
UID: types.UID("test-uid"),
CreationTimestamp: metav1.NewTime(time.Now()),
Labels: map[string]string{
"test": "label",
LabelJobClaim: "should-be-removed",
"env": "production",
},
Annotations: map[string]string{
"description": "Test job for validation",
},
},
Spec: provisioning.JobSpec{
Action: provisioning.JobActionPull,
Repository: "test-repo",
Pull: &provisioning.SyncJobOptions{
Incremental: true,
},
Push: &provisioning.ExportJobOptions{
Message: "Export test",
Folder: "dashboards",
Branch: "main",
Path: "/exported",
},
Migrate: &provisioning.MigrateJobOptions{
History: true,
Message: "Migration test",
},
Delete: &provisioning.DeleteJobOptions{
Ref: "main",
Paths: []string{"/old/dashboard.json", "/old/folder/"},
Resources: []provisioning.ResourceRef{{
Name: "dashboard-uid",
Kind: "Dashboard",
Group: "dashboard.grafana.app",
}},
},
Move: &provisioning.MoveJobOptions{
Ref: "feature-branch",
Paths: []string{"/src/dashboard.json"},
TargetPath: "/dest/",
Resources: []provisioning.ResourceRef{{
Name: "moved-dashboard",
Kind: "Dashboard",
Group: "dashboard.grafana.app",
}},
},
PullRequest: &provisioning.PullRequestJobOptions{
Ref: "feature-123",
PR: 123,
Hash: "abc123def456",
URL: "https://github.com/org/repo/pull/123",
},
},
Status: provisioning.JobStatus{
State: provisioning.JobStateSuccess,
Started: time.Now().UnixMilli() - 100000, // 100 seconds ago in milliseconds
Finished: time.Now().UnixMilli(), // Now in milliseconds
Message: "Job completed successfully",
Errors: []string{"warning: deprecated field used"},
Progress: 100.0,
Summary: []*provisioning.JobResourceSummary{{
Group: "dashboard.grafana.app",
Resource: "dashboards",
Total: 10,
Create: 3,
Update: 5,
Delete: 1,
Write: 8,
Error: 1,
Noop: 0,
Errors: []string{"failed to process dashboard-x"},
}},
},
}
t.Run("jobToStream creates correct stream with all fields", func(t *testing.T) {
history := createTestLokiJobHistory(t)
// Clean job copy like WriteJob does
jobCopy := job.DeepCopy()
delete(jobCopy.Labels, LabelJobClaim)
stream := history.jobToStream(context.Background(), jobCopy)
// Verify labels
assert.Equal(t, JobHistoryLabelValue, stream.Stream[JobHistoryLabelKey])
assert.Equal(t, job.Namespace, stream.Stream[NamespaceLabel])
assert.Equal(t, job.Spec.Repository, stream.Stream[RepositoryLabel])
assert.Equal(t, "test-value", stream.Stream["test-key"]) // external label
// Verify we have a sample
require.Len(t, stream.Values, 1)
// Verify timestamp (should use finished time converted from milliseconds)
expectedTime := time.Unix(0, job.Status.Finished*int64(time.Millisecond))
assert.Equal(t, expectedTime, stream.Values[0].T)
// Verify job data is JSON and contains all fields
var deserializedJob provisioning.Job
err := json.Unmarshal([]byte(stream.Values[0].V), &deserializedJob)
require.NoError(t, err)
// Verify metadata fields
assert.Equal(t, "test-job", deserializedJob.Name)
assert.Equal(t, "test-namespace", deserializedJob.Namespace)
assert.Equal(t, types.UID("test-uid"), deserializedJob.UID)
assert.Equal(t, "production", deserializedJob.Labels["env"])
assert.Equal(t, "Test job for validation", deserializedJob.Annotations["description"])
// Verify claim label was removed
_, exists := deserializedJob.Labels[LabelJobClaim]
assert.False(t, exists)
// Verify spec fields
assert.Equal(t, provisioning.JobActionPull, deserializedJob.Spec.Action)
assert.Equal(t, "test-repo", deserializedJob.Spec.Repository)
require.NotNil(t, deserializedJob.Spec.Pull)
assert.True(t, deserializedJob.Spec.Pull.Incremental)
require.NotNil(t, deserializedJob.Spec.Push)
assert.Equal(t, "Export test", deserializedJob.Spec.Push.Message)
assert.Equal(t, "dashboards", deserializedJob.Spec.Push.Folder)
assert.Equal(t, "main", deserializedJob.Spec.Push.Branch)
assert.Equal(t, "/exported", deserializedJob.Spec.Push.Path)
require.NotNil(t, deserializedJob.Spec.Migrate)
assert.True(t, deserializedJob.Spec.Migrate.History)
assert.Equal(t, "Migration test", deserializedJob.Spec.Migrate.Message)
require.NotNil(t, deserializedJob.Spec.Delete)
assert.Equal(t, "main", deserializedJob.Spec.Delete.Ref)
assert.Equal(t, []string{"/old/dashboard.json", "/old/folder/"}, deserializedJob.Spec.Delete.Paths)
require.Len(t, deserializedJob.Spec.Delete.Resources, 1)
assert.Equal(t, "dashboard-uid", deserializedJob.Spec.Delete.Resources[0].Name)
assert.Equal(t, "Dashboard", deserializedJob.Spec.Delete.Resources[0].Kind)
assert.Equal(t, "dashboard.grafana.app", deserializedJob.Spec.Delete.Resources[0].Group)
require.NotNil(t, deserializedJob.Spec.Move)
assert.Equal(t, "feature-branch", deserializedJob.Spec.Move.Ref)
assert.Equal(t, []string{"/src/dashboard.json"}, deserializedJob.Spec.Move.Paths)
assert.Equal(t, "/dest/", deserializedJob.Spec.Move.TargetPath)
require.Len(t, deserializedJob.Spec.Move.Resources, 1)
assert.Equal(t, "moved-dashboard", deserializedJob.Spec.Move.Resources[0].Name)
require.NotNil(t, deserializedJob.Spec.PullRequest)
assert.Equal(t, "feature-123", deserializedJob.Spec.PullRequest.Ref)
assert.Equal(t, 123, deserializedJob.Spec.PullRequest.PR)
assert.Equal(t, "abc123def456", deserializedJob.Spec.PullRequest.Hash)
assert.Equal(t, "https://github.com/org/repo/pull/123", deserializedJob.Spec.PullRequest.URL)
// Verify status fields
assert.Equal(t, provisioning.JobStateSuccess, deserializedJob.Status.State)
assert.Equal(t, job.Status.Started, deserializedJob.Status.Started)
assert.Equal(t, job.Status.Finished, deserializedJob.Status.Finished)
assert.Equal(t, "Job completed successfully", deserializedJob.Status.Message)
assert.Equal(t, []string{"warning: deprecated field used"}, deserializedJob.Status.Errors)
assert.Equal(t, 100.0, deserializedJob.Status.Progress)
require.Len(t, deserializedJob.Status.Summary, 1)
summary := deserializedJob.Status.Summary[0]
assert.Equal(t, "dashboard.grafana.app", summary.Group)
assert.Equal(t, "dashboards", summary.Resource)
assert.Equal(t, int64(10), summary.Total)
assert.Equal(t, int64(3), summary.Create)
assert.Equal(t, int64(5), summary.Update)
assert.Equal(t, int64(1), summary.Delete)
assert.Equal(t, int64(8), summary.Write)
assert.Equal(t, int64(1), summary.Error)
assert.Equal(t, int64(0), summary.Noop)
assert.Equal(t, []string{"failed to process dashboard-x"}, summary.Errors)
})
t.Run("buildJobQuery creates correct LogQL", func(t *testing.T) {
history := createTestLokiJobHistory(t)
query := history.buildJobQuery("test-ns", "test-repo")
expected := `{from="job-history",namespace="test-ns",repository="test-repo"}`
assert.Equal(t, expected, query)
})
t.Run("getJobTimestamp returns correct timestamp", func(t *testing.T) {
history := createTestLokiJobHistory(t)
// Test finished time priority
jobWithFinished := &provisioning.Job{
ObjectMeta: metav1.ObjectMeta{
CreationTimestamp: metav1.NewTime(time.Unix(100, 0)),
},
Status: provisioning.JobStatus{
Started: 200,
Finished: 300,
},
}
ts := history.getJobTimestamp(jobWithFinished)
assert.Equal(t, time.Unix(300, 0), ts)
// Test started time when no finished time
jobWithStarted := &provisioning.Job{
ObjectMeta: metav1.ObjectMeta{
CreationTimestamp: metav1.NewTime(time.Unix(100, 0)),
},
Status: provisioning.JobStatus{
Started: 200,
},
}
ts = history.getJobTimestamp(jobWithStarted)
assert.Equal(t, time.Unix(200, 0), ts)
// Test creation time when no other timestamps
jobWithCreation := &provisioning.Job{
ObjectMeta: metav1.ObjectMeta{
CreationTimestamp: metav1.NewTime(time.Unix(100, 0)),
},
}
ts = history.getJobTimestamp(jobWithCreation)
assert.Equal(t, time.Unix(100, 0), ts)
})
}
func TestLokiJobHistory_Integration(t *testing.T) {
// Create comprehensive test job with all spec and status fields for integration tests
integrationJob := &provisioning.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "integration-job",
Namespace: "test-namespace",
UID: types.UID("integration-uid"),
CreationTimestamp: metav1.NewTime(time.Now()),
Labels: map[string]string{
"test": "integration",
LabelJobClaim: "should-be-removed",
},
},
Spec: provisioning.JobSpec{
Action: provisioning.JobActionPull,
Repository: "test-repo",
},
Status: provisioning.JobStatus{
State: provisioning.JobStateSuccess,
Started: time.Now().UnixMilli() - 100000,
Finished: time.Now().UnixMilli(),
Message: "Integration test completed",
},
}
t.Run("WriteJob with mock validates complete flow", func(t *testing.T) {
mockClient := NewMockLokiClient(t)
history := &LokiJobHistory{
client: mockClient,
externalLabels: map[string]string{
"service": "grafana-provisioning",
},
}
// Set up mock expectation
mockClient.EXPECT().Push(
mock.MatchedBy(func(ctx context.Context) bool { return true }),
mock.MatchedBy(func(streams []loki.Stream) bool {
// Validate the stream structure
if len(streams) != 1 {
return false
}
stream := streams[0]
// Check labels
if stream.Stream[JobHistoryLabelKey] != JobHistoryLabelValue {
return false
}
if stream.Stream[NamespaceLabel] != "test-namespace" {
return false
}
if stream.Stream[RepositoryLabel] != "test-repo" {
return false
}
if stream.Stream["service"] != "grafana-provisioning" {
return false
}
// Check we have values
if len(stream.Values) != 1 {
return false
}
// Validate JSON content
var deserializedJob provisioning.Job
if err := json.Unmarshal([]byte(stream.Values[0].V), &deserializedJob); err != nil {
return false
}
// Check key fields are preserved
return deserializedJob.Name == "integration-job" &&
deserializedJob.Spec.Action == provisioning.JobActionPull &&
deserializedJob.Status.State == provisioning.JobStateSuccess
}),
).Return(nil)
// Execute WriteJob (using the integration job)
err := history.WriteJob(context.Background(), integrationJob)
require.NoError(t, err)
})
t.Run("WriteJob handles push errors", func(t *testing.T) {
// Create a simple job for this test
testJob := &provisioning.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "error-job",
Namespace: "test-namespace",
UID: types.UID("error-uid"),
},
Spec: provisioning.JobSpec{
Action: provisioning.JobActionPull,
Repository: "test-repo",
},
Status: provisioning.JobStatus{
State: provisioning.JobStateError,
},
}
mockClient := NewMockLokiClient(t)
history := &LokiJobHistory{
client: mockClient,
externalLabels: map[string]string{},
}
// Set up mock to return error
mockClient.EXPECT().Push(
mock.MatchedBy(func(ctx context.Context) bool { return true }),
mock.MatchedBy(func(streams []loki.Stream) bool { return true }),
).Return(errors.New("loki push failed"))
// Execute WriteJob and expect error
err := history.WriteJob(context.Background(), testJob)
require.Error(t, err)
assert.Contains(t, err.Error(), "failed to save job history")
assert.Contains(t, err.Error(), "loki push failed")
})
}
func TestLokiJobHistory_RecentJobs(t *testing.T) {
t.Run("RecentJobs returns parsed jobs from Loki", func(t *testing.T) {
mockClient := NewMockLokiClient(t)
history := &LokiJobHistory{
client: mockClient,
externalLabels: map[string]string{},
}
// Create test job JSON data
testJob := &provisioning.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "recent-job",
Namespace: "test-ns",
UID: types.UID("recent-uid"),
},
Spec: provisioning.JobSpec{
Action: provisioning.JobActionPull,
Repository: "test-repo",
},
Status: provisioning.JobStatus{
State: provisioning.JobStateSuccess,
Started: time.Now().UnixMilli() - 50000,
Finished: time.Now().UnixMilli(),
Message: "Success",
},
}
jobJSON, _ := json.Marshal(testJob)
// Mock Loki response
mockResult := loki.QueryRes{
Data: loki.QueryData{
Result: []loki.Stream{{
Stream: map[string]string{
JobHistoryLabelKey: JobHistoryLabelValue,
NamespaceLabel: "test-ns",
RepositoryLabel: "test-repo",
},
Values: []loki.Sample{{
T: time.Now(),
V: string(jobJSON),
}},
}},
},
}
// Set up mock expectation
mockClient.EXPECT().RangeQuery(
mock.MatchedBy(func(ctx context.Context) bool { return true }),
`{from="job-history",namespace="test-ns",repository="test-repo"}`,
mock.MatchedBy(func(start int64) bool { return start > 0 }),
mock.MatchedBy(func(end int64) bool { return end > 0 }),
int64(10),
).Return(mockResult, nil)
// Execute RecentJobs
result, err := history.RecentJobs(context.Background(), "test-ns", "test-repo")
require.NoError(t, err)
require.NotNil(t, result)
require.Len(t, result.Items, 1)
// Verify returned job
returnedJob := result.Items[0]
assert.Equal(t, "recent-job", returnedJob.Name)
assert.Equal(t, "test-ns", returnedJob.Namespace)
assert.Equal(t, types.UID("recent-uid"), returnedJob.UID)
assert.Equal(t, provisioning.JobActionPull, returnedJob.Spec.Action)
assert.Equal(t, "test-repo", returnedJob.Spec.Repository)
assert.Equal(t, provisioning.JobStateSuccess, returnedJob.Status.State)
})
t.Run("RecentJobs handles Loki query errors", func(t *testing.T) {
mockClient := NewMockLokiClient(t)
history := &LokiJobHistory{
client: mockClient,
externalLabels: map[string]string{},
}
// Set up mock to return error
mockClient.EXPECT().RangeQuery(
mock.MatchedBy(func(ctx context.Context) bool { return true }),
mock.MatchedBy(func(query string) bool { return true }),
mock.MatchedBy(func(start int64) bool { return true }),
mock.MatchedBy(func(end int64) bool { return true }),
mock.MatchedBy(func(limit int64) bool { return true }),
).Return(loki.QueryRes{}, errors.New("loki query failed"))
// Execute RecentJobs and expect error
result, err := history.RecentJobs(context.Background(), "test-ns", "test-repo")
require.Error(t, err)
assert.Nil(t, result)
assert.Contains(t, err.Error(), "failed to query job history")
assert.Contains(t, err.Error(), "loki query failed")
})
t.Run("RecentJobs handles invalid JSON gracefully", func(t *testing.T) {
mockClient := NewMockLokiClient(t)
history := &LokiJobHistory{
client: mockClient,
externalLabels: map[string]string{},
}
// Mock Loki response with invalid JSON
mockResult := loki.QueryRes{
Data: loki.QueryData{
Result: []loki.Stream{{
Stream: map[string]string{
JobHistoryLabelKey: JobHistoryLabelValue,
NamespaceLabel: "test-ns",
RepositoryLabel: "test-repo",
},
Values: []loki.Sample{{
T: time.Now(),
V: "invalid-json",
}},
}},
},
}
// Set up mock expectation
mockClient.EXPECT().RangeQuery(
mock.MatchedBy(func(ctx context.Context) bool { return true }),
mock.MatchedBy(func(query string) bool { return true }),
mock.MatchedBy(func(start int64) bool { return true }),
mock.MatchedBy(func(end int64) bool { return true }),
mock.MatchedBy(func(limit int64) bool { return true }),
).Return(mockResult, nil)
// Execute RecentJobs - should handle invalid JSON gracefully
result, err := history.RecentJobs(context.Background(), "test-ns", "test-repo")
require.NoError(t, err)
require.NotNil(t, result)
// Invalid JSON entries should be skipped
assert.Len(t, result.Items, 0)
})
}
func TestLokiJobHistory_GetJob(t *testing.T) {
t.Run("GetJob finds job by UID", func(t *testing.T) {
mockClient := NewMockLokiClient(t)
history := &LokiJobHistory{
client: mockClient,
externalLabels: map[string]string{},
}
// Create test jobs
job1 := &provisioning.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job-1",
Namespace: "test-ns",
UID: types.UID("uid-1"),
},
Spec: provisioning.JobSpec{Action: provisioning.JobActionPull, Repository: "test-repo"},
Status: provisioning.JobStatus{State: provisioning.JobStateSuccess},
}
job2 := &provisioning.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job-2",
Namespace: "test-ns",
UID: types.UID("target-uid"),
},
Spec: provisioning.JobSpec{Action: provisioning.JobActionPush, Repository: "test-repo"},
Status: provisioning.JobStatus{State: provisioning.JobStateSuccess},
}
job1JSON, _ := json.Marshal(job1)
job2JSON, _ := json.Marshal(job2)
// Mock Loki response with multiple jobs
mockResult := loki.QueryRes{
Data: loki.QueryData{
Result: []loki.Stream{{
Values: []loki.Sample{
{T: time.Now(), V: string(job1JSON)},
{T: time.Now(), V: string(job2JSON)},
},
}},
},
}
// Set up mock expectation
mockClient.EXPECT().RangeQuery(
mock.MatchedBy(func(ctx context.Context) bool { return true }),
mock.MatchedBy(func(query string) bool { return true }),
mock.MatchedBy(func(start int64) bool { return true }),
mock.MatchedBy(func(end int64) bool { return true }),
mock.MatchedBy(func(limit int64) bool { return true }),
).Return(mockResult, nil)
// Execute GetJob
result, err := history.GetJob(context.Background(), "test-ns", "test-repo", "target-uid")
require.NoError(t, err)
require.NotNil(t, result)
// Verify correct job was returned
assert.Equal(t, "job-2", result.Name)
assert.Equal(t, types.UID("target-uid"), result.UID)
assert.Equal(t, provisioning.JobActionPush, result.Spec.Action)
})
t.Run("GetJob returns NotFound for missing UID", func(t *testing.T) {
mockClient := NewMockLokiClient(t)
history := &LokiJobHistory{
client: mockClient,
externalLabels: map[string]string{},
}
// Mock empty Loki response
mockResult := loki.QueryRes{
Data: loki.QueryData{
Result: []loki.Stream{},
},
}
// Set up mock expectation
mockClient.EXPECT().RangeQuery(
mock.MatchedBy(func(ctx context.Context) bool { return true }),
mock.MatchedBy(func(query string) bool { return true }),
mock.MatchedBy(func(start int64) bool { return true }),
mock.MatchedBy(func(end int64) bool { return true }),
mock.MatchedBy(func(limit int64) bool { return true }),
).Return(mockResult, nil)
// Execute GetJob
result, err := history.GetJob(context.Background(), "test-ns", "test-repo", "missing-uid")
require.Error(t, err)
assert.Nil(t, result)
// Verify it's a NotFound error
assert.True(t, apierrors.IsNotFound(err))
})
}
// createTestLokiJobHistory creates a LokiJobHistory for testing
func createTestLokiJobHistory(t *testing.T) *LokiJobHistory {
// Create test URLs
readURL, _ := url.Parse("http://localhost:3100")
writeURL, _ := url.Parse("http://localhost:3100")
config := loki.Config{
ReadPathURL: readURL,
WritePathURL: writeURL,
ExternalLabels: map[string]string{
"test-key": "test-value",
},
MaxQuerySize: 1000,
}
return NewLokiJobHistory(config)
}

View File

@ -0,0 +1,185 @@
package loki
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"time"
"github.com/grafana/grafana-app-sdk/logging"
)
type Config struct {
ReadPathURL *url.URL
WritePathURL *url.URL
BasicAuthUser string
BasicAuthPassword string
TenantID string
ExternalLabels map[string]string
MaxQuerySize int
}
type Stream struct {
Stream map[string]string `json:"stream"`
Values []Sample `json:"values"`
}
type Sample struct {
T time.Time
V string
}
func (r Sample) MarshalJSON() ([]byte, error) {
return json.Marshal([2]string{
fmt.Sprintf("%d", r.T.UnixNano()), r.V,
})
}
func (r *Sample) UnmarshalJSON(b []byte) error {
var tuple [2]string
if err := json.Unmarshal(b, &tuple); err != nil {
return fmt.Errorf("failed to deserialize sample in Loki response: %w", err)
}
nano, err := strconv.ParseInt(tuple[0], 10, 64)
if err != nil {
return fmt.Errorf("timestamp in Loki sample not convertible to nanosecond epoch: %v", tuple[0])
}
r.T = time.Unix(0, nano)
r.V = tuple[1]
return nil
}
type QueryRes struct {
Data QueryData `json:"data"`
}
type QueryData struct {
Result []Stream `json:"result"`
}
type PushRequest struct {
Streams []Stream `json:"streams"`
}
type Client struct {
cfg Config
client *http.Client
}
func NewClient(cfg Config) *Client {
return &Client{
cfg: cfg,
client: &http.Client{Timeout: 30 * time.Second},
}
}
func (c *Client) Push(ctx context.Context, streams []Stream) error {
log := logging.FromContext(ctx)
pushReq := PushRequest{Streams: streams}
body, err := json.Marshal(pushReq)
if err != nil {
return fmt.Errorf("failed to marshal push request: %w", err)
}
uri := c.cfg.WritePathURL.JoinPath("/loki/api/v1/push")
req, err := http.NewRequest(http.MethodPost, uri.String(), bytes.NewBuffer(body))
if err != nil {
return fmt.Errorf("failed to create Loki request: %w", err)
}
c.setAuthAndTenantHeaders(req)
req.Header.Set("Content-Type", "application/json")
req = req.WithContext(ctx)
res, err := c.client.Do(req)
if res != nil {
defer func() {
if err := res.Body.Close(); err != nil {
log.Warn("Failed to close response body", "err", err)
}
}()
}
if err != nil {
return fmt.Errorf("error sending request: %w", err)
}
if res.StatusCode < 200 || res.StatusCode >= 300 {
body, _ := io.ReadAll(res.Body)
log.Error("Error response from Loki", "response", string(body), "status", res.StatusCode)
return fmt.Errorf("received a non-200 response from loki, status: %d", res.StatusCode)
}
log.Debug("Successfully pushed streams to Loki", "status", res.StatusCode, "streams", len(streams))
return nil
}
func (c *Client) RangeQuery(ctx context.Context, logQL string, start, end, limit int64) (QueryRes, error) {
log := logging.FromContext(ctx)
uri := c.cfg.ReadPathURL.JoinPath("/loki/api/v1/query_range")
req, err := http.NewRequest(http.MethodGet, uri.String(), nil)
if err != nil {
return QueryRes{}, fmt.Errorf("error creating request: %w", err)
}
q := req.URL.Query()
q.Set("query", logQL)
q.Set("start", strconv.FormatInt(start, 10))
q.Set("end", strconv.FormatInt(end, 10))
if limit > 0 {
q.Set("limit", strconv.FormatInt(limit, 10))
}
req.URL.RawQuery = q.Encode()
c.setAuthAndTenantHeaders(req)
req = req.WithContext(ctx)
res, err := c.client.Do(req)
if res != nil {
defer func() {
if err := res.Body.Close(); err != nil {
log.Warn("Failed to close response body", "err", err)
}
}()
}
if err != nil {
return QueryRes{}, fmt.Errorf("error sending request: %w", err)
}
body, err := io.ReadAll(res.Body)
if err != nil {
return QueryRes{}, fmt.Errorf("error reading request response: %w", err)
}
if res.StatusCode < 200 || res.StatusCode >= 300 {
if len(body) > 0 {
log.Error("Error response from Loki", "response", string(body), "status", res.StatusCode)
} else {
log.Error("Error response from Loki with an empty body", "status", res.StatusCode)
}
return QueryRes{}, fmt.Errorf("received a non-200 response from loki, status: %d", res.StatusCode)
}
var queryRes QueryRes
if err := json.Unmarshal(body, &queryRes); err != nil {
return QueryRes{}, fmt.Errorf("error unmarshaling loki response: %w", err)
}
log.Debug("Successfully queried Loki", "status", res.StatusCode, "streams", len(queryRes.Data.Result))
return queryRes, nil
}
func (c *Client) setAuthAndTenantHeaders(req *http.Request) {
if c.cfg.BasicAuthUser != "" || c.cfg.BasicAuthPassword != "" {
req.SetBasicAuth(c.cfg.BasicAuthUser, c.cfg.BasicAuthPassword)
}
if c.cfg.TenantID != "" {
req.Header.Set("X-Scope-OrgID", c.cfg.TenantID)
}
}

View File

@ -0,0 +1,314 @@
package loki
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestSample_MarshalJSON(t *testing.T) {
sample := Sample{
T: time.Unix(0, 1234567890000000000), // 1234567890 seconds in nanoseconds
V: "test log line",
}
data, err := json.Marshal(sample)
require.NoError(t, err)
expected := `["1234567890000000000","test log line"]`
assert.JSONEq(t, expected, string(data))
}
func TestSample_UnmarshalJSON(t *testing.T) {
t.Run("valid sample", func(t *testing.T) {
data := `["1234567890000000000","test log line"]`
var sample Sample
err := json.Unmarshal([]byte(data), &sample)
require.NoError(t, err)
assert.Equal(t, time.Unix(0, 1234567890000000000), sample.T)
assert.Equal(t, "test log line", sample.V)
})
t.Run("invalid format", func(t *testing.T) {
data := `"invalid"`
var sample Sample
err := json.Unmarshal([]byte(data), &sample)
assert.Error(t, err)
assert.Contains(t, err.Error(), "failed to deserialize sample")
})
t.Run("invalid timestamp", func(t *testing.T) {
data := `["not-a-number","test log line"]`
var sample Sample
err := json.Unmarshal([]byte(data), &sample)
assert.Error(t, err)
assert.Contains(t, err.Error(), "timestamp in Loki sample not convertible")
})
}
func TestClient_Push(t *testing.T) {
t.Run("successful push", func(t *testing.T) {
var receivedBody PushRequest
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/loki/api/v1/push", r.URL.Path)
assert.Equal(t, http.MethodPost, r.Method)
assert.Equal(t, "application/json", r.Header.Get("Content-Type"))
err := json.NewDecoder(r.Body).Decode(&receivedBody)
require.NoError(t, err)
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
client := createTestClient(t, server.URL, server.URL)
streams := []Stream{
{
Stream: map[string]string{"job": "test"},
Values: []Sample{
{T: time.Unix(0, 1234567890000000000), V: "log line 1"},
{T: time.Unix(0, 1234567891000000000), V: "log line 2"},
},
},
}
err := client.Push(context.Background(), streams)
assert.NoError(t, err)
// Verify the request body
assert.Len(t, receivedBody.Streams, 1)
assert.Equal(t, "test", receivedBody.Streams[0].Stream["job"])
assert.Len(t, receivedBody.Streams[0].Values, 2)
})
t.Run("push failure", func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("Bad request"))
}))
defer server.Close()
client := createTestClient(t, server.URL, server.URL)
streams := []Stream{{Stream: map[string]string{"job": "test"}}}
err := client.Push(context.Background(), streams)
assert.Error(t, err)
assert.Contains(t, err.Error(), "non-200 response")
})
}
func TestClient_RangeQuery(t *testing.T) {
t.Run("successful query", func(t *testing.T) {
expectedResponse := QueryRes{
Data: QueryData{
Result: []Stream{
{
Stream: map[string]string{"job": "test"},
Values: []Sample{
{T: time.Unix(0, 1234567890000000000), V: "log line 1"},
{T: time.Unix(0, 1234567891000000000), V: "log line 2"},
},
},
},
},
}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/loki/api/v1/query_range", r.URL.Path)
assert.Equal(t, http.MethodGet, r.Method)
// Check query parameters
params := r.URL.Query()
assert.Equal(t, `{job="test"}`, params.Get("query"))
assert.Equal(t, "1000000000", params.Get("start"))
assert.Equal(t, "2000000000", params.Get("end"))
assert.Equal(t, "100", params.Get("limit"))
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(expectedResponse)
}))
defer server.Close()
client := createTestClient(t, server.URL, server.URL)
result, err := client.RangeQuery(
context.Background(),
`{job="test"}`,
1000000000, // start
2000000000, // end
100, // limit
)
assert.NoError(t, err)
assert.Len(t, result.Data.Result, 1)
assert.Equal(t, "test", result.Data.Result[0].Stream["job"])
assert.Len(t, result.Data.Result[0].Values, 2)
})
t.Run("query without limit", func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
params := r.URL.Query()
assert.Equal(t, "", params.Get("limit")) // Should not be set
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(QueryRes{})
}))
defer server.Close()
client := createTestClient(t, server.URL, server.URL)
_, err := client.RangeQuery(context.Background(), `{job="test"}`, 1000000000, 2000000000, 0)
assert.NoError(t, err)
})
t.Run("query failure", func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("Bad query"))
}))
defer server.Close()
client := createTestClient(t, server.URL, server.URL)
_, err := client.RangeQuery(context.Background(), `{job="test"}`, 1000000000, 2000000000, 100)
assert.Error(t, err)
assert.Contains(t, err.Error(), "non-200 response")
})
t.Run("invalid JSON response", func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte("invalid json"))
}))
defer server.Close()
client := createTestClient(t, server.URL, server.URL)
_, err := client.RangeQuery(context.Background(), `{job="test"}`, 1000000000, 2000000000, 100)
assert.Error(t, err)
assert.Contains(t, err.Error(), "error unmarshaling loki response")
})
}
func TestClient_setAuthAndTenantHeaders(t *testing.T) {
t.Run("with basic auth and tenant", func(t *testing.T) {
cfg := createTestConfig(t, "http://localhost", "http://localhost")
cfg.BasicAuthUser = "testuser"
cfg.BasicAuthPassword = "testpass"
cfg.TenantID = "test-tenant"
client := NewClient(cfg)
req, _ := http.NewRequest(http.MethodGet, "http://localhost", nil)
client.setAuthAndTenantHeaders(req)
username, password, ok := req.BasicAuth()
assert.True(t, ok)
assert.Equal(t, "testuser", username)
assert.Equal(t, "testpass", password)
assert.Equal(t, "test-tenant", req.Header.Get("X-Scope-OrgID"))
})
t.Run("without auth", func(t *testing.T) {
cfg := createTestConfig(t, "http://localhost", "http://localhost")
client := NewClient(cfg)
req, _ := http.NewRequest(http.MethodGet, "http://localhost", nil)
client.setAuthAndTenantHeaders(req)
_, _, ok := req.BasicAuth()
assert.False(t, ok)
assert.Equal(t, "", req.Header.Get("X-Scope-OrgID"))
})
}
func TestStream_JSONRoundtrip(t *testing.T) {
original := Stream{
Stream: map[string]string{
"job": "test-job",
"instance": "test-instance",
"namespace": "test-ns",
},
Values: []Sample{
{T: time.Unix(0, 1234567890000000000), V: "log line 1"},
{T: time.Unix(0, 1234567891000000000), V: "log line 2"},
{T: time.Unix(0, 1234567892000000000), V: "log line 3"},
},
}
// Marshal to JSON
data, err := json.Marshal(original)
require.NoError(t, err)
// Unmarshal back
var restored Stream
err = json.Unmarshal(data, &restored)
require.NoError(t, err)
// Verify all fields match
assert.Equal(t, original.Stream, restored.Stream)
assert.Len(t, restored.Values, len(original.Values))
for i, sample := range original.Values {
assert.True(t, sample.T.Equal(restored.Values[i].T),
fmt.Sprintf("Timestamp mismatch at index %d: expected %v, got %v", i, sample.T, restored.Values[i].T))
assert.Equal(t, sample.V, restored.Values[i].V)
}
}
// Helper functions
func createTestClient(t *testing.T, readURL, writeURL string) *Client {
cfg := createTestConfig(t, readURL, writeURL)
return NewClient(cfg)
}
func createTestConfig(t *testing.T, readURL, writeURL string) Config {
readParsed, err := url.Parse(readURL)
require.NoError(t, err)
writeParsed, err := url.Parse(writeURL)
require.NoError(t, err)
return Config{
ReadPathURL: readParsed,
WritePathURL: writeParsed,
ExternalLabels: map[string]string{"source": "test"},
MaxQuerySize: 1000,
}
}
func TestClient_ContextCancellation(t *testing.T) {
t.Run("push with cancelled context", func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t.Error("Handler should not be called with cancelled context")
}))
defer server.Close()
client := createTestClient(t, server.URL, server.URL)
ctx, cancel := context.WithCancel(context.Background())
cancel()
streams := []Stream{{Stream: map[string]string{"job": "test"}}}
err := client.Push(ctx, streams)
assert.Error(t, err)
assert.Contains(t, err.Error(), "context canceled")
})
}

View File

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net/http"
"net/url"
"path/filepath"
"strings"
"time"
@ -48,6 +49,7 @@ import (
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/migrate"
movepkg "github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/move"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs/sync"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/loki"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/repository"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/repository/git"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/repository/github"
@ -76,6 +78,11 @@ var (
_ builder.OpenAPIPostProcessor = (*APIBuilder)(nil)
)
// JobHistoryConfig holds configuration for job history backends
type JobHistoryConfig struct {
Loki *loki.Config `json:"loki,omitempty"`
}
type APIBuilder struct {
features featuremgmt.FeatureToggles
usageStats usagestats.Service
@ -126,6 +133,7 @@ func NewAPIBuilder(
access authlib.AccessChecker,
tracer tracing.Tracer,
extraBuilders []ExtraBuilder,
jobHistoryConfig *JobHistoryConfig,
) *APIBuilder {
clients := resources.NewClientFactory(configProvider)
parsers := resources.NewParserFactory(clients)
@ -135,6 +143,12 @@ func NewAPIBuilder(
git.Mutator(repositorySecrets),
github.Mutator(repositorySecrets),
}
// Create job history based on configuration
// Default to in-memory cache if no config provided
jobHistory := jobs.NewJobHistoryCache()
if jobHistoryConfig != nil && jobHistoryConfig.Loki != nil {
jobHistory = jobs.NewLokiJobHistory(*jobHistoryConfig.Loki)
}
b := &APIBuilder{
mutators: mutators,
@ -153,7 +167,7 @@ func NewAPIBuilder(
unified: unified,
repositorySecrets: repositorySecrets,
access: access,
jobHistory: jobs.NewJobHistoryCache(),
jobHistory: jobHistory,
availableRepositoryTypes: map[provisioning.RepositoryType]bool{
provisioning.LocalRepositoryType: true,
provisioning.GitHubRepositoryType: true,
@ -176,6 +190,38 @@ func NewAPIBuilder(
return b
}
// createJobHistoryConfigFromSettings creates JobHistoryConfig from Grafana settings
func createJobHistoryConfigFromSettings(cfg *setting.Cfg) *JobHistoryConfig {
// If LokiURL is defined, use Loki
if cfg.ProvisioningLokiURL != "" {
parsedURL, err := url.Parse(cfg.ProvisioningLokiURL)
if err != nil {
logging.DefaultLogger.Error("Invalid Loki URL in provisioning config", "url", cfg.ProvisioningLokiURL, "error", err)
return &JobHistoryConfig{}
}
lokiCfg := &loki.Config{
ReadPathURL: parsedURL,
WritePathURL: parsedURL,
BasicAuthUser: cfg.ProvisioningLokiUser,
BasicAuthPassword: cfg.ProvisioningLokiPassword,
TenantID: cfg.ProvisioningLokiTenantID,
ExternalLabels: map[string]string{
"source": "grafana-provisioning",
"service_name": "grafana-provisioning",
},
MaxQuerySize: 5000, // Default query size
}
return &JobHistoryConfig{
Loki: lokiCfg,
}
}
// Default to memory backend
return &JobHistoryConfig{}
}
// RegisterAPIService returns an API builder, from [NewAPIBuilder]. It is called by Wire.
// This function happily uses services core to Grafana, and does not need to be multi-tenancy-compatible.
func RegisterAPIService(
@ -213,6 +259,7 @@ func RegisterAPIService(
access,
tracer,
extraBuilders,
createJobHistoryConfigFromSettings(cfg),
)
apiregistration.RegisterAPI(builder)
return builder, nil

View File

@ -132,10 +132,15 @@ type Cfg struct {
HomePath string
ProvisioningPath string
PermittedProvisioningPaths []string
DataPath string
LogsPath string
PluginsPath string
EnterpriseLicensePath string
// Job History Configuration
ProvisioningLokiURL string
ProvisioningLokiUser string
ProvisioningLokiPassword string
ProvisioningLokiTenantID string
DataPath string
LogsPath string
PluginsPath string
EnterpriseLicensePath string
// SMTP email settings
Smtp SmtpSettings
@ -553,7 +558,7 @@ type Cfg struct {
ScopesListScopesURL string
ScopesListDashboardsURL string
//Short Links
// Short Links
ShortLinkExpiration int
// Unified Storage
@ -787,7 +792,7 @@ func (cfg *Cfg) readAnnotationSettings() error {
dashboardAnnotation := cfg.Raw.Section("annotations.dashboard")
apiIAnnotation := cfg.Raw.Section("annotations.api")
var newAnnotationCleanupSettings = func(section *ini.Section, maxAgeField string) AnnotationCleanupSettings {
newAnnotationCleanupSettings := func(section *ini.Section, maxAgeField string) AnnotationCleanupSettings {
maxAge, err := gtime.ParseDuration(section.Key(maxAgeField).MustString(""))
if err != nil {
maxAge = 0
@ -1771,7 +1776,8 @@ func readUserSettings(iniFile *ini.File, cfg *Cfg) error {
string(identity.RoleNone),
string(identity.RoleViewer),
string(identity.RoleEditor),
string(identity.RoleAdmin)})
string(identity.RoleAdmin),
})
cfg.VerifyEmailEnabled = users.Key("verify_email_enabled").MustBool(false)
// Deprecated
@ -2078,6 +2084,13 @@ func (cfg *Cfg) readProvisioningSettings(iniFile *ini.File) error {
cfg.PermittedProvisioningPaths[i] = makeAbsolute(s, cfg.HomePath)
}
}
// Read job history configuration
cfg.ProvisioningLokiURL = valueAsString(iniFile.Section("provisioning"), "loki_url", "")
cfg.ProvisioningLokiUser = valueAsString(iniFile.Section("provisioning"), "loki_user", "")
cfg.ProvisioningLokiPassword = valueAsString(iniFile.Section("provisioning"), "loki_password", "")
cfg.ProvisioningLokiTenantID = valueAsString(iniFile.Section("provisioning"), "loki_tenant_id", "")
return nil
}