Unified Storage: Adds pruner to kv backend (#110549)

* WIP adding pruner to kv store impl

* pruner only keeps 20 most recent versions

* ignore grafana-kv-data folder

* extracts some stuff to pruner.go file. Adds tests. Adds kvBackendOptions.

* update logging, comments, exports kvbackendoptions fields

* update nooppruner ref

* fixes field casing in test

* fix test

* linter fixes

* remove comment

* make KvStorageBackend private

* Adds pruner key validation and tests. Fixes broken tests.

* update error message when validating pruner key
This commit is contained in:
owensmallwood 2025-09-05 10:02:11 -06:00 committed by GitHub
parent f0095d84e3
commit d715bda8af
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 386 additions and 69 deletions

4
.gitignore vendored
View File

@ -245,3 +245,7 @@ public/mockServiceWorker.js
/e2e-playwright/test-plugins/*/dist
/apps/provisioning/cmd/job-controller/bin/
# Ignore unified storage kv store files
/grafana-kv-data

View File

@ -0,0 +1,30 @@
package resource
import "context"
// Pruner Small abstraction to allow for different Pruner implementations.
// This can be removed once the debouncer is deployed.
type Pruner interface {
Add(key PruningKey) error
Start(ctx context.Context)
}
// PruningKey is a comparable key for pruning history.
type PruningKey struct {
Namespace string
Group string
Resource string
Name string
}
func (k PruningKey) Validate() bool {
return k.Namespace != "" && k.Group != "" && k.Resource != "" && k.Name != ""
}
type NoopPruner struct{}
func (p *NoopPruner) Add(key PruningKey) error {
return nil
}
func (p *NoopPruner) Start(ctx context.Context) {}

View File

@ -0,0 +1,67 @@
package resource
import "testing"
func TestPrunerValidate(t *testing.T) {
tests := []struct {
name string
key PruningKey
expected bool
}{
{
name: "valid key",
key: PruningKey{
Namespace: "default",
Group: "apps",
Resource: "deployments",
Name: "my-deployment",
},
expected: true,
},
{
name: "missing namespace",
key: PruningKey{
Group: "apps",
Resource: "deployments",
Name: "my-deployment",
},
expected: false,
},
{
name: "missing group",
key: PruningKey{
Namespace: "default",
Resource: "deployments",
Name: "my-deployment",
},
expected: false,
},
{
name: "missing resource",
key: PruningKey{
Namespace: "default",
Group: "apps",
Name: "my-deployment",
},
expected: false,
},
{
name: "missing name",
key: PruningKey{
Namespace: "default",
Group: "apps",
Resource: "deployments",
},
expected: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := tt.key.Validate()
if result != tt.expected {
t.Errorf("expected %v, got %v", tt.expected, result)
}
})
}
}

View File

@ -18,34 +18,52 @@ import (
"github.com/grafana/grafana-app-sdk/logging"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/util/debouncer"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const (
defaultListBufferSize = 100
prunerMaxEvents = 20
)
// Unified storage backend based on KV storage.
// kvStorageBackend Unified storage backend based on KV storage.
type kvStorageBackend struct {
snowflake *snowflake.Node
kv KV
dataStore *dataStore
metaStore *metadataStore
eventStore *eventStore
notifier *notifier
builder DocumentBuilder
log logging.Logger
snowflake *snowflake.Node
kv KV
dataStore *dataStore
metaStore *metadataStore
eventStore *eventStore
notifier *notifier
builder DocumentBuilder
log logging.Logger
withPruner bool
historyPruner Pruner
//tracer trace.Tracer
//reg prometheus.Registerer
}
var _ StorageBackend = &kvStorageBackend{}
func NewKvStorageBackend(kv KV) *kvStorageBackend {
type KvBackendOptions struct {
KvStore KV
WithPruner bool
Tracer trace.Tracer // TODO add tracing
Reg prometheus.Registerer // TODO add metrics
}
func NewKvStorageBackend(opts KvBackendOptions) (StorageBackend, error) {
ctx := context.Background()
kv := opts.KvStore
s, err := snowflake.NewNode(rand.Int64N(1024))
if err != nil {
panic(err)
return nil, fmt.Errorf("failed to create snowflake node: %w", err)
}
eventStore := newEventStore(kv)
return &kvStorageBackend{
backend := &kvStorageBackend{
kv: kv,
dataStore: newDataStore(kv),
metaStore: newMetadataStore(kv),
@ -55,6 +73,72 @@ func NewKvStorageBackend(kv KV) *kvStorageBackend {
builder: StandardDocumentBuilder(), // For now we use the standard document builder.
log: &logging.NoOpLogger{}, // Make this configurable
}
err = backend.initPruner(ctx)
if err != nil {
return nil, fmt.Errorf("failed to initialize pruner: %w", err)
}
return backend, nil
}
func (k *kvStorageBackend) pruneEvents(ctx context.Context, key PruningKey) error {
if !key.Validate() {
return fmt.Errorf("invalid pruning key, all fields must be set: %+v", key)
}
keepEvents := make([]DataKey, 0, prunerMaxEvents)
// iterate over all keys for the resource and delete versions beyond the latest 20
for datakey, err := range k.dataStore.Keys(ctx, ListRequestKey(key)) {
if err != nil {
return err
}
if len(keepEvents) < prunerMaxEvents {
keepEvents = append(keepEvents, datakey)
continue
}
// If we already have 20 versions, delete the oldest one and append the new one
err := k.dataStore.Delete(ctx, keepEvents[0])
if err != nil {
return err
}
keepEvents = append(keepEvents[1:], datakey)
}
return nil
}
func (k *kvStorageBackend) initPruner(ctx context.Context) error {
if !k.withPruner {
k.log.Debug("Pruner disabled, using noop pruner")
k.historyPruner = &NoopPruner{}
return nil
}
k.log.Debug("Initializing history pruner")
pruner, err := debouncer.NewGroup(debouncer.DebouncerOpts[PruningKey]{
Name: "history_pruner",
BufferSize: 1000,
MinWait: time.Second * 30,
MaxWait: time.Minute * 5,
ProcessHandler: k.pruneEvents,
ErrorHandler: func(key PruningKey, err error) {
k.log.Error("failed to prune history",
"namespace", key.Namespace,
"group", key.Group,
"resource", key.Resource,
"name", key.Name,
"error", err)
},
})
if err != nil {
return err
}
k.historyPruner = pruner
k.historyPruner.Start(ctx)
return nil
}
// WriteEvent writes a resource event (create/update/delete) to the storage backend.
@ -150,6 +234,14 @@ func (k *kvStorageBackend) WriteEvent(ctx context.Context, event WriteEvent) (in
if err != nil {
return 0, fmt.Errorf("failed to save event: %w", err)
}
_ = k.historyPruner.Add(PruningKey{
Namespace: event.Key.Namespace,
Group: event.Key.Group,
Resource: event.Key.Resource,
Name: event.Key.Name,
})
return rv, nil
}

View File

@ -27,7 +27,14 @@ var appsNamespace = NamespacedResource{
func setupTestStorageBackend(t *testing.T) *kvStorageBackend {
kv := setupTestKV(t)
return NewKvStorageBackend(kv)
opts := KvBackendOptions{
KvStore: kv,
WithPruner: true,
}
backend, err := NewKvStorageBackend(opts)
kvBackend := backend.(*kvStorageBackend)
require.NoError(t, err)
return kvBackend
}
func TestNewKvStorageBackend(t *testing.T) {
@ -1180,6 +1187,141 @@ func TestKvStorageBackend_GetResourceStats_Success(t *testing.T) {
require.Equal(t, int64(2), filteredStats[0].Count)
}
func TestKvStorageBackend_PruneEvents(t *testing.T) {
t.Run("will prune oldest events when exceeding limit", func(t *testing.T) {
backend := setupTestStorageBackend(t)
ctx := context.Background()
// Create a resource
testObj, err := createTestObjectWithName("test-resource", "apps", "test-data")
require.NoError(t, err)
metaAccessor, err := utils.MetaAccessor(testObj)
require.NoError(t, err)
writeEvent := WriteEvent{
Type: resourcepb.WatchEvent_ADDED,
Key: &resourcepb.ResourceKey{
Namespace: "default",
Group: "apps",
Resource: "resources",
Name: "test-resource",
},
Value: objectToJSONBytes(t, testObj),
Object: metaAccessor,
PreviousRV: 0,
}
rv1, err := backend.WriteEvent(ctx, writeEvent)
require.NoError(t, err)
// Update the resource prunerMaxEvents times. This will create one more event than the pruner limit.
previousRV := rv1
for i := 0; i < prunerMaxEvents; i++ {
testObj.Object["spec"].(map[string]any)["value"] = fmt.Sprintf("update-%d", i)
writeEvent.Type = resourcepb.WatchEvent_MODIFIED
writeEvent.Value = objectToJSONBytes(t, testObj)
writeEvent.PreviousRV = previousRV
newRv, err := backend.WriteEvent(ctx, writeEvent)
require.NoError(t, err)
previousRV = newRv
}
pruningKey := PruningKey{
Namespace: "default",
Group: "apps",
Resource: "resources",
Name: "test-resource",
}
err = backend.pruneEvents(ctx, pruningKey)
require.NoError(t, err)
// Verify the first event has been pruned (rv1)
eventKey1 := DataKey{
Namespace: "default",
Group: "apps",
Resource: "resources",
Name: "test-resource",
ResourceVersion: rv1,
}
_, err = backend.dataStore.Get(ctx, eventKey1)
require.Error(t, err) // Should return error as event is pruned
// assert prunerMaxEvents most recent events exist
counter := 0
for datakey, err := range backend.dataStore.Keys(ctx, ListRequestKey{
Namespace: "default",
Group: "apps",
Resource: "resources",
Name: "test-resource",
}) {
require.NoError(t, err)
require.NotEqual(t, rv1, datakey.ResourceVersion)
counter++
}
require.Equal(t, prunerMaxEvents, counter)
})
t.Run("will not prune events when less than limit", func(t *testing.T) {
backend := setupTestStorageBackend(t)
ctx := context.Background()
// Create a resource
testObj, err := createTestObjectWithName("test-resource", "apps", "test-data")
require.NoError(t, err)
metaAccessor, err := utils.MetaAccessor(testObj)
require.NoError(t, err)
writeEvent := WriteEvent{
Type: resourcepb.WatchEvent_ADDED,
Key: &resourcepb.ResourceKey{
Namespace: "default",
Group: "apps",
Resource: "resources",
Name: "test-resource",
},
Value: objectToJSONBytes(t, testObj),
Object: metaAccessor,
PreviousRV: 0,
}
rv1, err := backend.WriteEvent(ctx, writeEvent)
require.NoError(t, err)
// Update the resource prunerMaxEvents-1 times. This will create same number of events as the pruner limit.
previousRV := rv1
for i := 0; i < prunerMaxEvents-1; i++ {
testObj.Object["spec"].(map[string]any)["value"] = fmt.Sprintf("update-%d", i)
writeEvent.Type = resourcepb.WatchEvent_MODIFIED
writeEvent.Value = objectToJSONBytes(t, testObj)
writeEvent.PreviousRV = previousRV
newRv, err := backend.WriteEvent(ctx, writeEvent)
require.NoError(t, err)
previousRV = newRv
}
pruningKey := PruningKey{
Namespace: "default",
Group: "apps",
Resource: "resources",
Name: "test-resource",
}
err = backend.pruneEvents(ctx, pruningKey)
require.NoError(t, err)
// assert all events exist
counter := 0
for _, err := range backend.dataStore.Keys(ctx, ListRequestKey{
Namespace: "default",
Group: "apps",
Resource: "resources",
Name: "test-resource",
}) {
require.NoError(t, err)
counter++
}
require.Equal(t, prunerMaxEvents, counter)
})
}
// createTestObject creates a test unstructured object with standard values
func createTestObject() (*unstructured.Unstructured, error) {
return createTestObjectWithName("test-resource", appsNamespace, "test data")

View File

@ -91,29 +91,6 @@ func NewBackend(opts BackendOptions) (Backend, error) {
}, nil
}
// pruningKey is a comparable key for pruning history.
type pruningKey struct {
namespace string
group string
resource string
name string
}
// Small abstraction to allow for different pruner implementations.
// This can be removed once the debouncer is deployed.
type pruner interface {
Add(key pruningKey) error
Start(ctx context.Context)
}
type noopPruner struct{}
func (p *noopPruner) Add(key pruningKey) error {
return nil
}
func (p *noopPruner) Start(ctx context.Context) {}
type backend struct {
//general
isHA bool
@ -148,7 +125,7 @@ type backend struct {
// testing
simulatedNetworkLatency time.Duration
historyPruner pruner
historyPruner resource.Pruner
withPruner bool
}
@ -205,26 +182,26 @@ func (b *backend) initLocked(ctx context.Context) error {
func (b *backend) initPruner(ctx context.Context) error {
if !b.withPruner {
b.log.Debug("using noop history pruner")
b.historyPruner = &noopPruner{}
b.historyPruner = &resource.NoopPruner{}
return nil
}
b.log.Debug("using debounced history pruner")
// Initialize history pruner.
pruner, err := debouncer.NewGroup(debouncer.DebouncerOpts[pruningKey]{
pruner, err := debouncer.NewGroup(debouncer.DebouncerOpts[resource.PruningKey]{
Name: "history_pruner",
BufferSize: 1000,
MinWait: time.Second * 30,
MaxWait: time.Minute * 5,
ProcessHandler: func(ctx context.Context, key pruningKey) error {
ProcessHandler: func(ctx context.Context, key resource.PruningKey) error {
return b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
res, err := dbutil.Exec(ctx, tx, sqlResourceHistoryPrune, &sqlPruneHistoryRequest{
SQLTemplate: sqltemplate.New(b.dialect),
HistoryLimit: defaultPrunerHistoryLimit,
Key: &resourcepb.ResourceKey{
Namespace: key.namespace,
Group: key.group,
Resource: key.resource,
Name: key.name,
Namespace: key.Namespace,
Group: key.Group,
Resource: key.Resource,
Name: key.Name,
},
})
if err != nil {
@ -235,20 +212,20 @@ func (b *backend) initPruner(ctx context.Context) error {
return fmt.Errorf("failed to get rows affected: %w", err)
}
b.log.Debug("pruned history successfully",
"namespace", key.namespace,
"group", key.group,
"resource", key.resource,
"name", key.name,
"namespace", key.Namespace,
"group", key.Group,
"resource", key.Resource,
"name", key.Name,
"rows", rows)
return nil
})
},
ErrorHandler: func(key pruningKey, err error) {
ErrorHandler: func(key resource.PruningKey, err error) {
b.log.Error("failed to prune history",
"namespace", key.namespace,
"group", key.group,
"resource", key.resource,
"name", key.name,
"namespace", key.Namespace,
"group", key.Group,
"resource", key.Resource,
"name", key.Name,
"error", err)
},
Reg: b.reg,
@ -361,11 +338,11 @@ func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64,
}); err != nil {
return event.GUID, fmt.Errorf("insert into resource history: %w", err)
}
_ = b.historyPruner.Add(pruningKey{
namespace: event.Key.Namespace,
group: event.Key.Group,
resource: event.Key.Resource,
name: event.Key.Name,
_ = b.historyPruner.Add(resource.PruningKey{
Namespace: event.Key.Namespace,
Group: event.Key.Group,
Resource: event.Key.Resource,
Name: event.Key.Name,
})
if b.simulatedNetworkLatency > 0 {
time.Sleep(b.simulatedNetworkLatency)
@ -448,11 +425,11 @@ func (b *backend) update(ctx context.Context, event resource.WriteEvent) (int64,
}); err != nil {
return event.GUID, fmt.Errorf("insert into resource history: %w", err)
}
_ = b.historyPruner.Add(pruningKey{
namespace: event.Key.Namespace,
group: event.Key.Group,
resource: event.Key.Resource,
name: event.Key.Name,
_ = b.historyPruner.Add(resource.PruningKey{
Namespace: event.Key.Namespace,
Group: event.Key.Group,
Resource: event.Key.Resource,
Name: event.Key.Name,
})
return event.GUID, nil
})
@ -502,11 +479,11 @@ func (b *backend) delete(ctx context.Context, event resource.WriteEvent) (int64,
}); err != nil {
return event.GUID, fmt.Errorf("insert into resource history: %w", err)
}
_ = b.historyPruner.Add(pruningKey{
namespace: event.Key.Namespace,
group: event.Key.Group,
resource: event.Key.Resource,
name: event.Key.Name,
_ = b.historyPruner.Add(resource.PruningKey{
Namespace: event.Key.Namespace,
Group: event.Key.Group,
Resource: event.Key.Resource,
Name: event.Key.Name,
})
return event.GUID, nil
})

View File

@ -18,7 +18,12 @@ func TestBadgerKVStorageBackend(t *testing.T) {
t.Cleanup(func() {
_ = db.Close()
})
return resource.NewKvStorageBackend(resource.NewBadgerKV(db))
kvOpts := resource.KvBackendOptions{
KvStore: resource.NewBadgerKV(db),
}
backend, err := resource.NewKvStorageBackend(kvOpts)
require.NoError(t, err)
return backend
}, &TestOptions{
NSPrefix: "kvstorage-test",
SkipTests: map[string]bool{