mirror of https://github.com/grafana/grafana.git
1092 lines
30 KiB
Go
1092 lines
30 KiB
Go
package resource
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"iter"
|
|
"math/rand/v2"
|
|
"net/http"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/bwmarrin/snowflake"
|
|
"github.com/grafana/grafana-app-sdk/logging"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"go.opentelemetry.io/otel/trace"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
|
|
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
|
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
|
"github.com/grafana/grafana/pkg/util/debouncer"
|
|
)
|
|
|
|
const (
|
|
defaultListBufferSize = 100
|
|
prunerMaxEvents = 20
|
|
defaultEventRetentionPeriod = 1 * time.Hour
|
|
defaultEventPruningInterval = 5 * time.Minute
|
|
)
|
|
|
|
// kvStorageBackend Unified storage backend based on KV storage.
|
|
type kvStorageBackend struct {
|
|
snowflake *snowflake.Node
|
|
kv KV
|
|
dataStore *dataStore
|
|
eventStore *eventStore
|
|
notifier *notifier
|
|
builder DocumentBuilder
|
|
log logging.Logger
|
|
withPruner bool
|
|
eventRetentionPeriod time.Duration
|
|
eventPruningInterval time.Duration
|
|
historyPruner Pruner
|
|
//tracer trace.Tracer
|
|
//reg prometheus.Registerer
|
|
}
|
|
|
|
var _ StorageBackend = &kvStorageBackend{}
|
|
|
|
type KVBackendOptions struct {
|
|
KvStore KV
|
|
WithPruner bool
|
|
EventRetentionPeriod time.Duration // How long to keep events (default: 1 hour)
|
|
EventPruningInterval time.Duration // How often to run the event pruning (default: 5 minutes)
|
|
Tracer trace.Tracer // TODO add tracing
|
|
Reg prometheus.Registerer // TODO add metrics
|
|
}
|
|
|
|
func NewKVStorageBackend(opts KVBackendOptions) (StorageBackend, error) {
|
|
ctx := context.Background()
|
|
kv := opts.KvStore
|
|
|
|
s, err := snowflake.NewNode(rand.Int64N(1024))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create snowflake node: %w", err)
|
|
}
|
|
eventStore := newEventStore(kv)
|
|
|
|
eventRetentionPeriod := opts.EventRetentionPeriod
|
|
if eventRetentionPeriod <= 0 {
|
|
eventRetentionPeriod = defaultEventRetentionPeriod
|
|
}
|
|
|
|
eventPruningInterval := opts.EventPruningInterval
|
|
if eventPruningInterval <= 0 {
|
|
eventPruningInterval = defaultEventPruningInterval
|
|
}
|
|
|
|
backend := &kvStorageBackend{
|
|
kv: kv,
|
|
dataStore: newDataStore(kv),
|
|
eventStore: eventStore,
|
|
notifier: newNotifier(eventStore, notifierOptions{}),
|
|
snowflake: s,
|
|
builder: StandardDocumentBuilder(), // For now we use the standard document builder.
|
|
log: &logging.NoOpLogger{}, // Make this configurable
|
|
eventRetentionPeriod: eventRetentionPeriod,
|
|
eventPruningInterval: eventPruningInterval,
|
|
}
|
|
err = backend.initPruner(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize pruner: %w", err)
|
|
}
|
|
|
|
// Start the event cleanup background job
|
|
go backend.runCleanupOldEvents(ctx)
|
|
|
|
return backend, nil
|
|
}
|
|
|
|
// runCleanupOldEvents starts a background goroutine that periodically cleans up old events
|
|
func (k *kvStorageBackend) runCleanupOldEvents(ctx context.Context) {
|
|
// Run cleanup every hour
|
|
ticker := time.NewTicker(k.eventPruningInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
k.log.Debug("Event cleanup stopped due to context cancellation")
|
|
return
|
|
case <-ticker.C:
|
|
k.cleanupOldEvents(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
// cleanupOldEvents performs the actual cleanup of old events
|
|
func (k *kvStorageBackend) cleanupOldEvents(ctx context.Context) {
|
|
cutoff := time.Now().Add(-k.eventRetentionPeriod)
|
|
deletedCount, err := k.eventStore.CleanupOldEvents(ctx, cutoff)
|
|
if err != nil {
|
|
k.log.Error("Failed to cleanup old events", "error", err)
|
|
return
|
|
}
|
|
|
|
if deletedCount == 0 {
|
|
k.log.Info("Cleaned up old events", "deleted_count", deletedCount, "retention_period", k.eventRetentionPeriod)
|
|
}
|
|
}
|
|
|
|
func (k *kvStorageBackend) pruneEvents(ctx context.Context, key PruningKey) error {
|
|
if !key.Validate() {
|
|
return fmt.Errorf("invalid pruning key, all fields must be set: %+v", key)
|
|
}
|
|
|
|
counter := 0
|
|
// iterate over all keys for the resource and delete versions beyond the latest 20
|
|
for datakey, err := range k.dataStore.Keys(ctx, ListRequestKey{
|
|
Namespace: key.Namespace,
|
|
Group: key.Group,
|
|
Resource: key.Resource,
|
|
Name: key.Name,
|
|
}, SortOrderDesc) {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Pruner needs to exclude deleted events
|
|
if counter < prunerMaxEvents && datakey.Action != DataActionDeleted {
|
|
counter++
|
|
continue
|
|
}
|
|
|
|
// If we already have 20 versions, delete any more create or update events
|
|
if datakey.Action != DataActionDeleted {
|
|
err := k.dataStore.Delete(ctx, datakey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (k *kvStorageBackend) initPruner(ctx context.Context) error {
|
|
if !k.withPruner {
|
|
k.log.Debug("Pruner disabled, using noop pruner")
|
|
k.historyPruner = &NoopPruner{}
|
|
return nil
|
|
}
|
|
|
|
k.log.Debug("Initializing history pruner")
|
|
pruner, err := debouncer.NewGroup(debouncer.DebouncerOpts[PruningKey]{
|
|
Name: "history_pruner",
|
|
BufferSize: 1000,
|
|
MinWait: time.Second * 30,
|
|
MaxWait: time.Minute * 5,
|
|
ProcessHandler: k.pruneEvents,
|
|
ErrorHandler: func(key PruningKey, err error) {
|
|
k.log.Error("failed to prune history",
|
|
"namespace", key.Namespace,
|
|
"group", key.Group,
|
|
"resource", key.Resource,
|
|
"name", key.Name,
|
|
"error", err)
|
|
},
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
k.historyPruner = pruner
|
|
k.historyPruner.Start(ctx)
|
|
return nil
|
|
}
|
|
|
|
// WriteEvent writes a resource event (create/update/delete) to the storage backend.
|
|
func (k *kvStorageBackend) WriteEvent(ctx context.Context, event WriteEvent) (int64, error) {
|
|
if err := event.Validate(); err != nil {
|
|
return 0, fmt.Errorf("invalid event: %w", err)
|
|
}
|
|
rv := k.snowflake.Generate().Int64()
|
|
|
|
obj := event.Object
|
|
// Write data.
|
|
var action DataAction
|
|
switch event.Type {
|
|
case resourcepb.WatchEvent_ADDED:
|
|
action = DataActionCreated
|
|
// Check if resource already exists for create operations
|
|
_, err := k.dataStore.GetLatestResourceKey(ctx, GetRequestKey{
|
|
Group: event.Key.Group,
|
|
Resource: event.Key.Resource,
|
|
Namespace: event.Key.Namespace,
|
|
Name: event.Key.Name,
|
|
})
|
|
if err == nil {
|
|
// Resource exists, return already exists error
|
|
return 0, ErrResourceAlreadyExists
|
|
}
|
|
if !errors.Is(err, ErrNotFound) {
|
|
// Some other error occurred
|
|
return 0, fmt.Errorf("failed to check if resource exists: %w", err)
|
|
}
|
|
case resourcepb.WatchEvent_MODIFIED:
|
|
action = DataActionUpdated
|
|
case resourcepb.WatchEvent_DELETED:
|
|
action = DataActionDeleted
|
|
obj = event.ObjectOld
|
|
default:
|
|
return 0, fmt.Errorf("invalid event type: %d", event.Type)
|
|
}
|
|
|
|
if obj == nil {
|
|
return 0, fmt.Errorf("object is nil")
|
|
}
|
|
|
|
// Write the data
|
|
err := k.dataStore.Save(ctx, DataKey{
|
|
Group: event.Key.Group,
|
|
Resource: event.Key.Resource,
|
|
Namespace: event.Key.Namespace,
|
|
Name: event.Key.Name,
|
|
ResourceVersion: rv,
|
|
Action: action,
|
|
Folder: obj.GetFolder(),
|
|
}, bytes.NewReader(event.Value))
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to write data: %w", err)
|
|
}
|
|
|
|
// Write event
|
|
err = k.eventStore.Save(ctx, Event{
|
|
Namespace: event.Key.Namespace,
|
|
Group: event.Key.Group,
|
|
Resource: event.Key.Resource,
|
|
Name: event.Key.Name,
|
|
ResourceVersion: rv,
|
|
Action: action,
|
|
Folder: obj.GetFolder(),
|
|
PreviousRV: event.PreviousRV,
|
|
})
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to save event: %w", err)
|
|
}
|
|
|
|
_ = k.historyPruner.Add(PruningKey{
|
|
Namespace: event.Key.Namespace,
|
|
Group: event.Key.Group,
|
|
Resource: event.Key.Resource,
|
|
Name: event.Key.Name,
|
|
})
|
|
|
|
return rv, nil
|
|
}
|
|
|
|
func (k *kvStorageBackend) ReadResource(ctx context.Context, req *resourcepb.ReadRequest) *BackendReadResponse {
|
|
if req.Key == nil {
|
|
return &BackendReadResponse{Error: &resourcepb.ErrorResult{Code: http.StatusBadRequest, Message: "missing key"}}
|
|
}
|
|
meta, err := k.dataStore.GetResourceKeyAtRevision(ctx, GetRequestKey{
|
|
Group: req.Key.Group,
|
|
Resource: req.Key.Resource,
|
|
Namespace: req.Key.Namespace,
|
|
Name: req.Key.Name,
|
|
}, req.ResourceVersion)
|
|
if errors.Is(err, ErrNotFound) {
|
|
return &BackendReadResponse{Error: &resourcepb.ErrorResult{Code: http.StatusNotFound, Message: "not found"}}
|
|
} else if err != nil {
|
|
return &BackendReadResponse{Error: &resourcepb.ErrorResult{Code: http.StatusInternalServerError, Message: err.Error()}}
|
|
}
|
|
data, err := k.dataStore.Get(ctx, DataKey{
|
|
Group: req.Key.Group,
|
|
Resource: req.Key.Resource,
|
|
Namespace: req.Key.Namespace,
|
|
Name: req.Key.Name,
|
|
ResourceVersion: meta.ResourceVersion,
|
|
Action: meta.Action,
|
|
Folder: meta.Folder,
|
|
})
|
|
if err != nil || data == nil {
|
|
return &BackendReadResponse{Error: &resourcepb.ErrorResult{Code: http.StatusInternalServerError, Message: err.Error()}}
|
|
}
|
|
value, err := readAndClose(data)
|
|
if err != nil {
|
|
return &BackendReadResponse{Error: &resourcepb.ErrorResult{Code: http.StatusInternalServerError, Message: err.Error()}}
|
|
}
|
|
return &BackendReadResponse{
|
|
Key: req.Key,
|
|
ResourceVersion: meta.ResourceVersion,
|
|
Value: value,
|
|
Folder: meta.Folder,
|
|
}
|
|
}
|
|
|
|
// ListIterator returns an iterator for listing resources.
|
|
func (k *kvStorageBackend) ListIterator(ctx context.Context, req *resourcepb.ListRequest, cb func(ListIterator) error) (int64, error) {
|
|
if req.Options == nil || req.Options.Key == nil {
|
|
return 0, fmt.Errorf("missing options or key in ListRequest")
|
|
}
|
|
// Parse continue token if provided
|
|
offset := int64(0)
|
|
resourceVersion := req.ResourceVersion
|
|
if req.NextPageToken != "" {
|
|
token, err := GetContinueToken(req.NextPageToken)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("invalid continue token: %w", err)
|
|
}
|
|
offset = token.StartOffset
|
|
resourceVersion = token.ResourceVersion
|
|
}
|
|
|
|
// We set the listRV to the current time.
|
|
listRV := k.snowflake.Generate().Int64()
|
|
if resourceVersion > 0 {
|
|
listRV = resourceVersion
|
|
}
|
|
|
|
// Fetch the latest objects
|
|
keys := make([]DataKey, 0, min(defaultListBufferSize, req.Limit+1))
|
|
idx := 0
|
|
for dataKey, err := range k.dataStore.ListResourceKeysAtRevision(ctx, ListRequestKey{
|
|
Group: req.Options.Key.Group,
|
|
Resource: req.Options.Key.Resource,
|
|
Namespace: req.Options.Key.Namespace,
|
|
Name: req.Options.Key.Name,
|
|
}, resourceVersion) {
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
// Skip the first offset items. This is not efficient, but it's a simple way to implement it for now.
|
|
if idx < int(offset) {
|
|
idx++
|
|
continue
|
|
}
|
|
keys = append(keys, dataKey)
|
|
// Only fetch the first limit items + 1 to get the next token.
|
|
if len(keys) >= int(req.Limit+1) {
|
|
break
|
|
}
|
|
}
|
|
iter := kvListIterator{
|
|
keys: keys,
|
|
currentIndex: -1,
|
|
ctx: ctx,
|
|
listRV: listRV,
|
|
offset: offset,
|
|
limit: req.Limit + 1, // TODO: for now we need at least one more item. Fix the caller
|
|
dataStore: k.dataStore,
|
|
}
|
|
err := cb(&iter)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return listRV, nil
|
|
}
|
|
|
|
// kvListIterator implements ListIterator for KV storage
|
|
type kvListIterator struct {
|
|
ctx context.Context
|
|
keys []DataKey
|
|
currentIndex int
|
|
dataStore *dataStore
|
|
listRV int64
|
|
offset int64
|
|
limit int64
|
|
|
|
// current
|
|
rv int64
|
|
err error
|
|
value []byte
|
|
}
|
|
|
|
func (i *kvListIterator) Next() bool {
|
|
i.currentIndex++
|
|
|
|
if i.currentIndex >= len(i.keys) {
|
|
return false
|
|
}
|
|
|
|
if int64(i.currentIndex) >= i.limit {
|
|
return false
|
|
}
|
|
|
|
i.rv, i.err = i.keys[i.currentIndex].ResourceVersion, nil
|
|
|
|
data, err := i.dataStore.Get(i.ctx, i.keys[i.currentIndex])
|
|
if err != nil {
|
|
i.err = err
|
|
return false
|
|
}
|
|
|
|
i.value, i.err = readAndClose(data)
|
|
if i.err != nil {
|
|
return false
|
|
}
|
|
|
|
// increment the offset
|
|
i.offset++
|
|
|
|
return true
|
|
}
|
|
|
|
func (i *kvListIterator) Error() error {
|
|
return nil
|
|
}
|
|
|
|
func (i *kvListIterator) ContinueToken() string {
|
|
return ContinueToken{
|
|
StartOffset: i.offset,
|
|
ResourceVersion: i.listRV,
|
|
}.String()
|
|
}
|
|
|
|
func (i *kvListIterator) ResourceVersion() int64 {
|
|
return i.rv
|
|
}
|
|
|
|
func (i *kvListIterator) Namespace() string {
|
|
return i.keys[i.currentIndex].Namespace
|
|
}
|
|
|
|
func (i *kvListIterator) Name() string {
|
|
return i.keys[i.currentIndex].Name
|
|
}
|
|
|
|
func (i *kvListIterator) Folder() string {
|
|
return i.keys[i.currentIndex].Folder
|
|
}
|
|
|
|
func (i *kvListIterator) Value() []byte {
|
|
return i.value
|
|
}
|
|
|
|
func validateListHistoryRequest(req *resourcepb.ListRequest) error {
|
|
if req.Options == nil || req.Options.Key == nil {
|
|
return fmt.Errorf("missing options or key in ListRequest")
|
|
}
|
|
key := req.Options.Key
|
|
if key.Group == "" {
|
|
return fmt.Errorf("group is required")
|
|
}
|
|
if key.Resource == "" {
|
|
return fmt.Errorf("resource is required")
|
|
}
|
|
if key.Namespace == "" {
|
|
return fmt.Errorf("namespace is required")
|
|
}
|
|
if key.Name == "" {
|
|
return fmt.Errorf("name is required")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// filterHistoryKeysByVersion filters history keys based on version match criteria
|
|
func filterHistoryKeysByVersion(historyKeys []DataKey, req *resourcepb.ListRequest) ([]DataKey, error) {
|
|
switch req.GetVersionMatchV2() {
|
|
case resourcepb.ResourceVersionMatchV2_Exact:
|
|
if req.ResourceVersion <= 0 {
|
|
return nil, fmt.Errorf("expecting an explicit resource version query when using Exact matching")
|
|
}
|
|
exactKeys := make([]DataKey, 0, len(historyKeys))
|
|
for _, key := range historyKeys {
|
|
if key.ResourceVersion == req.ResourceVersion {
|
|
exactKeys = append(exactKeys, key)
|
|
}
|
|
}
|
|
return exactKeys, nil
|
|
case resourcepb.ResourceVersionMatchV2_NotOlderThan:
|
|
if req.ResourceVersion > 0 {
|
|
filteredKeys := make([]DataKey, 0, len(historyKeys))
|
|
for _, key := range historyKeys {
|
|
if key.ResourceVersion >= req.ResourceVersion {
|
|
filteredKeys = append(filteredKeys, key)
|
|
}
|
|
}
|
|
return filteredKeys, nil
|
|
}
|
|
default:
|
|
if req.ResourceVersion > 0 {
|
|
filteredKeys := make([]DataKey, 0, len(historyKeys))
|
|
for _, key := range historyKeys {
|
|
if key.ResourceVersion <= req.ResourceVersion {
|
|
filteredKeys = append(filteredKeys, key)
|
|
}
|
|
}
|
|
return filteredKeys, nil
|
|
}
|
|
}
|
|
return historyKeys, nil
|
|
}
|
|
|
|
// applyLiveHistoryFilter applies "live" history logic by ignoring events before the last delete
|
|
func applyLiveHistoryFilter(filteredKeys []DataKey, req *resourcepb.ListRequest) []DataKey {
|
|
useLatestDeletionAsMinRV := req.ResourceVersion == 0 && req.Source != resourcepb.ListRequest_TRASH && req.GetVersionMatchV2() != resourcepb.ResourceVersionMatchV2_Exact
|
|
if !useLatestDeletionAsMinRV {
|
|
return filteredKeys
|
|
}
|
|
|
|
latestDeleteRV := int64(0)
|
|
for _, key := range filteredKeys {
|
|
if key.Action == DataActionDeleted && key.ResourceVersion > latestDeleteRV {
|
|
latestDeleteRV = key.ResourceVersion
|
|
}
|
|
}
|
|
if latestDeleteRV > 0 {
|
|
liveKeys := make([]DataKey, 0, len(filteredKeys))
|
|
for _, key := range filteredKeys {
|
|
if key.ResourceVersion > latestDeleteRV {
|
|
liveKeys = append(liveKeys, key)
|
|
}
|
|
}
|
|
return liveKeys
|
|
}
|
|
return filteredKeys
|
|
}
|
|
|
|
// sortByResourceVersion sorts the history keys based on the sortAscending flag
|
|
func sortByResourceVersion(filteredKeys []DataKey, sortAscending bool) {
|
|
if sortAscending {
|
|
sort.Slice(filteredKeys, func(i, j int) bool {
|
|
return filteredKeys[i].ResourceVersion < filteredKeys[j].ResourceVersion
|
|
})
|
|
} else {
|
|
sort.Slice(filteredKeys, func(i, j int) bool {
|
|
return filteredKeys[i].ResourceVersion > filteredKeys[j].ResourceVersion
|
|
})
|
|
}
|
|
}
|
|
|
|
// applyPagination filters keys based on pagination parameters
|
|
func applyPagination(keys []DataKey, lastSeenRV int64, sortAscending bool) []DataKey {
|
|
if lastSeenRV == 0 {
|
|
return keys
|
|
}
|
|
|
|
pagedKeys := make([]DataKey, 0, len(keys))
|
|
for _, key := range keys {
|
|
if sortAscending && key.ResourceVersion > lastSeenRV {
|
|
pagedKeys = append(pagedKeys, key)
|
|
} else if !sortAscending && key.ResourceVersion < lastSeenRV {
|
|
pagedKeys = append(pagedKeys, key)
|
|
}
|
|
}
|
|
return pagedKeys
|
|
}
|
|
|
|
func (k *kvStorageBackend) ListModifiedSince(ctx context.Context, key NamespacedResource, sinceRv int64) (int64, iter.Seq2[*ModifiedResource, error]) {
|
|
if !key.Valid() {
|
|
return 0, func(yield func(*ModifiedResource, error) bool) {
|
|
yield(nil, fmt.Errorf("group, resource, and namespace are required"))
|
|
}
|
|
}
|
|
|
|
if sinceRv <= 0 {
|
|
return 0, func(yield func(*ModifiedResource, error) bool) {
|
|
yield(nil, fmt.Errorf("sinceRv must be greater than 0"))
|
|
}
|
|
}
|
|
|
|
// Generate a new resource version for the list
|
|
listRV := k.snowflake.Generate().Int64()
|
|
|
|
// Check if sinceRv is older than 1 hour
|
|
sinceRvTimestamp := snowflake.ID(sinceRv).Time()
|
|
sinceTime := time.Unix(0, sinceRvTimestamp*int64(time.Millisecond))
|
|
sinceRvAge := time.Since(sinceTime)
|
|
|
|
if sinceRvAge > time.Hour {
|
|
k.log.Debug("ListModifiedSince using data store", "sinceRv", sinceRv, "sinceRvAge", sinceRvAge)
|
|
return listRV, k.listModifiedSinceDataStore(ctx, key, sinceRv)
|
|
}
|
|
|
|
k.log.Debug("ListModifiedSince using event store", "sinceRv", sinceRv, "sinceRvAge", sinceRvAge)
|
|
return listRV, k.listModifiedSinceEventStore(ctx, key, sinceRv)
|
|
}
|
|
|
|
func convertEventType(action DataAction) resourcepb.WatchEvent_Type {
|
|
switch action {
|
|
case DataActionCreated:
|
|
return resourcepb.WatchEvent_ADDED
|
|
case DataActionUpdated:
|
|
return resourcepb.WatchEvent_MODIFIED
|
|
case DataActionDeleted:
|
|
return resourcepb.WatchEvent_DELETED
|
|
default:
|
|
panic(fmt.Sprintf("unknown DataAction: %v", action))
|
|
}
|
|
}
|
|
|
|
func (k *kvStorageBackend) getValueFromDataStore(ctx context.Context, dataKey DataKey) ([]byte, error) {
|
|
raw, err := k.dataStore.Get(ctx, dataKey)
|
|
if err != nil {
|
|
return []byte{}, err
|
|
}
|
|
|
|
value, err := io.ReadAll(raw)
|
|
if err != nil {
|
|
return []byte{}, err
|
|
}
|
|
|
|
return value, nil
|
|
}
|
|
|
|
func (k *kvStorageBackend) listModifiedSinceDataStore(ctx context.Context, key NamespacedResource, sinceRv int64) iter.Seq2[*ModifiedResource, error] {
|
|
return func(yield func(*ModifiedResource, error) bool) {
|
|
var lastSeenResource *ModifiedResource
|
|
var lastSeenDataKey DataKey
|
|
for dataKey, err := range k.dataStore.Keys(ctx, ListRequestKey{Namespace: key.Namespace, Group: key.Group, Resource: key.Resource}, SortOrderAsc) {
|
|
if err != nil {
|
|
yield(&ModifiedResource{}, err)
|
|
return
|
|
}
|
|
|
|
if dataKey.ResourceVersion < sinceRv {
|
|
continue
|
|
}
|
|
|
|
if lastSeenResource == nil {
|
|
lastSeenResource = &ModifiedResource{
|
|
Key: resourcepb.ResourceKey{
|
|
Namespace: dataKey.Namespace,
|
|
Group: dataKey.Group,
|
|
Resource: dataKey.Resource,
|
|
Name: dataKey.Name,
|
|
},
|
|
ResourceVersion: dataKey.ResourceVersion,
|
|
Action: convertEventType(dataKey.Action),
|
|
}
|
|
lastSeenDataKey = dataKey
|
|
}
|
|
|
|
if lastSeenResource.Key.Name != dataKey.Name {
|
|
value, err := k.getValueFromDataStore(ctx, lastSeenDataKey)
|
|
if err != nil {
|
|
yield(&ModifiedResource{}, err)
|
|
return
|
|
}
|
|
|
|
lastSeenResource.Value = value
|
|
|
|
if !yield(lastSeenResource, nil) {
|
|
return
|
|
}
|
|
}
|
|
|
|
lastSeenResource = &ModifiedResource{
|
|
Key: resourcepb.ResourceKey{
|
|
Namespace: dataKey.Namespace,
|
|
Group: dataKey.Group,
|
|
Resource: dataKey.Resource,
|
|
Name: dataKey.Name,
|
|
},
|
|
ResourceVersion: dataKey.ResourceVersion,
|
|
Action: convertEventType(dataKey.Action),
|
|
}
|
|
lastSeenDataKey = dataKey
|
|
}
|
|
|
|
if lastSeenResource != nil {
|
|
value, err := k.getValueFromDataStore(ctx, lastSeenDataKey)
|
|
if err != nil {
|
|
yield(&ModifiedResource{}, err)
|
|
return
|
|
}
|
|
|
|
lastSeenResource.Value = value
|
|
|
|
yield(lastSeenResource, nil)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (k *kvStorageBackend) listModifiedSinceEventStore(ctx context.Context, key NamespacedResource, sinceRv int64) iter.Seq2[*ModifiedResource, error] {
|
|
return func(yield func(*ModifiedResource, error) bool) {
|
|
// store all events ordered by RV for the given tenant here
|
|
eventKeys := make([]EventKey, 0)
|
|
for evtKeyStr, err := range k.eventStore.ListKeysSince(ctx, sinceRv-defaultLookbackPeriod.Nanoseconds()) {
|
|
if err != nil {
|
|
yield(&ModifiedResource{}, err)
|
|
return
|
|
}
|
|
|
|
evtKey, err := ParseEventKey(evtKeyStr)
|
|
if err != nil {
|
|
yield(&ModifiedResource{}, err)
|
|
return
|
|
}
|
|
|
|
if evtKey.ResourceVersion < sinceRv {
|
|
continue
|
|
}
|
|
|
|
if evtKey.Group != key.Group || evtKey.Resource != key.Resource || evtKey.Namespace != key.Namespace {
|
|
continue
|
|
}
|
|
|
|
eventKeys = append(eventKeys, evtKey)
|
|
}
|
|
|
|
// we only care about the latest revision of every resource in the list
|
|
seen := make(map[string]struct{})
|
|
for i := len(eventKeys) - 1; i >= 0; i -= 1 {
|
|
evtKey := eventKeys[i]
|
|
if _, ok := seen[evtKey.Name]; ok {
|
|
continue
|
|
}
|
|
seen[evtKey.Name] = struct{}{}
|
|
|
|
value, err := k.getValueFromDataStore(ctx, DataKey{
|
|
Group: evtKey.Group,
|
|
Resource: evtKey.Resource,
|
|
Namespace: evtKey.Namespace,
|
|
Name: evtKey.Name,
|
|
ResourceVersion: evtKey.ResourceVersion,
|
|
Action: evtKey.Action,
|
|
})
|
|
if err != nil {
|
|
yield(&ModifiedResource{}, err)
|
|
return
|
|
}
|
|
|
|
if !yield(&ModifiedResource{
|
|
Key: resourcepb.ResourceKey{
|
|
Group: evtKey.Group,
|
|
Resource: evtKey.Resource,
|
|
Namespace: evtKey.Namespace,
|
|
Name: evtKey.Name,
|
|
},
|
|
Action: convertEventType(evtKey.Action),
|
|
ResourceVersion: evtKey.ResourceVersion,
|
|
Value: value,
|
|
}, nil) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ListHistory is like ListIterator, but it returns the history of a resource.
|
|
func (k *kvStorageBackend) ListHistory(ctx context.Context, req *resourcepb.ListRequest, fn func(ListIterator) error) (int64, error) {
|
|
if err := validateListHistoryRequest(req); err != nil {
|
|
return 0, err
|
|
}
|
|
key := req.Options.Key
|
|
// Parse continue token if provided
|
|
lastSeenRV := int64(0)
|
|
sortAscending := req.GetVersionMatchV2() == resourcepb.ResourceVersionMatchV2_NotOlderThan
|
|
if req.NextPageToken != "" {
|
|
token, err := GetContinueToken(req.NextPageToken)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("invalid continue token: %w", err)
|
|
}
|
|
lastSeenRV = token.ResourceVersion
|
|
sortAscending = token.SortAscending
|
|
}
|
|
|
|
// Generate a new resource version for the list
|
|
listRV := k.snowflake.Generate().Int64()
|
|
|
|
// Get all history entries by iterating through datastore keys
|
|
historyKeys := make([]DataKey, 0, min(defaultListBufferSize, req.Limit+1))
|
|
|
|
// Use datastore.Keys to get all data keys for this specific resource
|
|
for dataKey, err := range k.dataStore.Keys(ctx, ListRequestKey{
|
|
Namespace: key.Namespace,
|
|
Group: key.Group,
|
|
Resource: key.Resource,
|
|
Name: key.Name,
|
|
}, SortOrderAsc) {
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
historyKeys = append(historyKeys, dataKey)
|
|
}
|
|
|
|
// Check if context has been cancelled
|
|
if ctx.Err() != nil {
|
|
return 0, ctx.Err()
|
|
}
|
|
|
|
// Handle trash differently from regular history
|
|
if req.Source == resourcepb.ListRequest_TRASH {
|
|
return k.processTrashEntries(ctx, req, fn, historyKeys, lastSeenRV, sortAscending, listRV)
|
|
}
|
|
|
|
// Apply filtering based on version match
|
|
filteredKeys, filterErr := filterHistoryKeysByVersion(historyKeys, req)
|
|
if filterErr != nil {
|
|
return 0, filterErr
|
|
}
|
|
|
|
// Apply "live" history logic: ignore events before the last delete
|
|
filteredKeys = applyLiveHistoryFilter(filteredKeys, req)
|
|
|
|
// Sort the entries if not already sorted correctly
|
|
sortByResourceVersion(filteredKeys, sortAscending)
|
|
|
|
// Pagination: filter out items up to and including lastSeenRV
|
|
pagedKeys := applyPagination(filteredKeys, lastSeenRV, sortAscending)
|
|
|
|
iter := kvHistoryIterator{
|
|
keys: pagedKeys,
|
|
currentIndex: -1,
|
|
ctx: ctx,
|
|
listRV: listRV,
|
|
sortAscending: sortAscending,
|
|
dataStore: k.dataStore,
|
|
}
|
|
|
|
err := fn(&iter)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return listRV, nil
|
|
}
|
|
|
|
// processTrashEntries handles the special case of listing deleted items (trash)
|
|
func (k *kvStorageBackend) processTrashEntries(ctx context.Context, req *resourcepb.ListRequest, fn func(ListIterator) error, historyKeys []DataKey, lastSeenRV int64, sortAscending bool, listRV int64) (int64, error) {
|
|
// Filter to only deleted entries
|
|
deletedKeys := make([]DataKey, 0, len(historyKeys))
|
|
for _, key := range historyKeys {
|
|
if key.Action == DataActionDeleted {
|
|
deletedKeys = append(deletedKeys, key)
|
|
}
|
|
}
|
|
|
|
// Check if the resource currently exists (is live)
|
|
// If it exists, don't return any trash entries
|
|
_, err := k.dataStore.GetLatestResourceKey(ctx, GetRequestKey{
|
|
Group: req.Options.Key.Group,
|
|
Resource: req.Options.Key.Resource,
|
|
Namespace: req.Options.Key.Namespace,
|
|
Name: req.Options.Key.Name,
|
|
})
|
|
|
|
trashKeys := make([]DataKey, 0, 1)
|
|
if errors.Is(err, ErrNotFound) {
|
|
// Resource doesn't exist currently, so we can return the latest delete
|
|
// Find the latest delete event
|
|
var latestDelete *DataKey
|
|
for _, key := range deletedKeys {
|
|
if latestDelete == nil || key.ResourceVersion > latestDelete.ResourceVersion {
|
|
latestDelete = &key
|
|
}
|
|
}
|
|
if latestDelete != nil {
|
|
trashKeys = append(trashKeys, *latestDelete)
|
|
}
|
|
}
|
|
// If err != ErrNotFound, the resource exists, so no trash entries should be returned
|
|
|
|
// Apply version filtering
|
|
filteredKeys, err := filterHistoryKeysByVersion(trashKeys, req)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Sort the entries
|
|
sortByResourceVersion(filteredKeys, sortAscending)
|
|
|
|
// Pagination: filter out items up to and including lastSeenRV
|
|
pagedKeys := applyPagination(filteredKeys, lastSeenRV, sortAscending)
|
|
|
|
iter := kvHistoryIterator{
|
|
keys: pagedKeys,
|
|
currentIndex: -1,
|
|
ctx: ctx,
|
|
listRV: listRV,
|
|
sortAscending: sortAscending,
|
|
dataStore: k.dataStore,
|
|
skipProvisioned: true,
|
|
}
|
|
|
|
err = fn(&iter)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return listRV, nil
|
|
}
|
|
|
|
// kvHistoryIterator implements ListIterator for KV storage history
|
|
type kvHistoryIterator struct {
|
|
ctx context.Context
|
|
keys []DataKey
|
|
currentIndex int
|
|
listRV int64
|
|
sortAscending bool
|
|
skipProvisioned bool
|
|
dataStore *dataStore
|
|
|
|
// current
|
|
rv int64
|
|
err error
|
|
value []byte
|
|
folder string
|
|
}
|
|
|
|
func (i *kvHistoryIterator) Next() bool {
|
|
i.currentIndex++
|
|
|
|
if i.currentIndex >= len(i.keys) {
|
|
return false
|
|
}
|
|
|
|
key := i.keys[i.currentIndex]
|
|
i.rv = key.ResourceVersion
|
|
|
|
// Read the value from the ReadCloser
|
|
data, err := i.dataStore.Get(i.ctx, key)
|
|
if err != nil {
|
|
i.err = err
|
|
return false
|
|
}
|
|
if data == nil {
|
|
i.err = fmt.Errorf("data is nil")
|
|
return false
|
|
}
|
|
i.value, i.err = readAndClose(data)
|
|
if i.err != nil {
|
|
return false
|
|
}
|
|
|
|
// Extract the folder from the meta data
|
|
partial := &metav1.PartialObjectMetadata{}
|
|
err = json.Unmarshal(i.value, partial)
|
|
if err != nil {
|
|
i.err = err
|
|
return false
|
|
}
|
|
|
|
meta, err := utils.MetaAccessor(partial)
|
|
if err != nil {
|
|
i.err = err
|
|
return false
|
|
}
|
|
i.folder = meta.GetFolder()
|
|
i.err = nil
|
|
|
|
// if the resource is provisioned and we are skipping provisioned resources, continue onto the next one
|
|
if i.skipProvisioned && meta.GetAnnotation(utils.AnnoKeyManagerKind) != "" {
|
|
return i.Next()
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (i *kvHistoryIterator) Error() error {
|
|
return i.err
|
|
}
|
|
|
|
func (i *kvHistoryIterator) ContinueToken() string {
|
|
if i.currentIndex < 0 || i.currentIndex >= len(i.keys) {
|
|
return ""
|
|
}
|
|
token := ContinueToken{
|
|
StartOffset: i.rv,
|
|
ResourceVersion: i.keys[i.currentIndex].ResourceVersion,
|
|
SortAscending: i.sortAscending,
|
|
}
|
|
return token.String()
|
|
}
|
|
|
|
func (i *kvHistoryIterator) ResourceVersion() int64 {
|
|
return i.rv
|
|
}
|
|
|
|
func (i *kvHistoryIterator) Namespace() string {
|
|
if i.currentIndex >= 0 && i.currentIndex < len(i.keys) {
|
|
return i.keys[i.currentIndex].Namespace
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func (i *kvHistoryIterator) Name() string {
|
|
if i.currentIndex >= 0 && i.currentIndex < len(i.keys) {
|
|
return i.keys[i.currentIndex].Name
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func (i *kvHistoryIterator) Folder() string {
|
|
return i.folder
|
|
}
|
|
|
|
func (i *kvHistoryIterator) Value() []byte {
|
|
return i.value
|
|
}
|
|
|
|
// WatchWriteEvents returns a channel that receives write events.
|
|
func (k *kvStorageBackend) WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error) {
|
|
// Create a channel to receive events
|
|
events := make(chan *WrittenEvent, 10000) // TODO: make this configurable
|
|
|
|
notifierEvents := k.notifier.Watch(ctx, defaultWatchOptions())
|
|
go func() {
|
|
for event := range notifierEvents {
|
|
// fetch the data
|
|
dataReader, err := k.dataStore.Get(ctx, DataKey{
|
|
Group: event.Group,
|
|
Resource: event.Resource,
|
|
Namespace: event.Namespace,
|
|
Name: event.Name,
|
|
ResourceVersion: event.ResourceVersion,
|
|
Action: event.Action,
|
|
Folder: event.Folder,
|
|
})
|
|
if err != nil || dataReader == nil {
|
|
k.log.Error("failed to get data for event", "error", err)
|
|
continue
|
|
}
|
|
data, err := readAndClose(dataReader)
|
|
if err != nil {
|
|
k.log.Error("failed to read and close data for event", "error", err)
|
|
continue
|
|
}
|
|
var t resourcepb.WatchEvent_Type
|
|
switch event.Action {
|
|
case DataActionCreated:
|
|
t = resourcepb.WatchEvent_ADDED
|
|
case DataActionUpdated:
|
|
t = resourcepb.WatchEvent_MODIFIED
|
|
case DataActionDeleted:
|
|
t = resourcepb.WatchEvent_DELETED
|
|
}
|
|
|
|
events <- &WrittenEvent{
|
|
Key: &resourcepb.ResourceKey{
|
|
Namespace: event.Namespace,
|
|
Group: event.Group,
|
|
Resource: event.Resource,
|
|
Name: event.Name,
|
|
},
|
|
Type: t,
|
|
Folder: event.Folder,
|
|
Value: data,
|
|
ResourceVersion: event.ResourceVersion,
|
|
PreviousRV: event.PreviousRV,
|
|
Timestamp: event.ResourceVersion / time.Second.Nanoseconds(), // convert to seconds
|
|
}
|
|
}
|
|
close(events)
|
|
}()
|
|
return events, nil
|
|
}
|
|
|
|
// GetResourceStats returns resource stats within the storage backend.
|
|
func (k *kvStorageBackend) GetResourceStats(ctx context.Context, namespace string, minCount int) ([]ResourceStats, error) {
|
|
return k.dataStore.GetResourceStats(ctx, namespace, minCount)
|
|
}
|
|
|
|
func (k *kvStorageBackend) GetResourceLastImportTimes(ctx context.Context) iter.Seq2[ResourceLastImportTime, error] {
|
|
return func(yield func(ResourceLastImportTime, error) bool) {
|
|
yield(ResourceLastImportTime{}, fmt.Errorf("not implemented"))
|
|
}
|
|
}
|
|
|
|
// readAndClose reads all data from a ReadCloser and ensures it's closed,
|
|
// combining any errors from both operations.
|
|
func readAndClose(r io.ReadCloser) ([]byte, error) {
|
|
data, err := io.ReadAll(r)
|
|
return data, errors.Join(err, r.Close())
|
|
}
|