mirror of https://github.com/grafana/grafana.git
kvstore: merge the metadata store into the datastore (#110334)
* migrate eventstore to datastore * Add folder to event key * lint * lint * lint * lint * remove foundkye * refactor the Keys methods to move the Sort outside the ListKey method * remove bad import * fix missing params * lint * fix test * perf improvement
This commit is contained in:
parent
8a352cc352
commit
c251ebf4d5
|
@ -5,23 +5,31 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"iter"
|
||||
"math"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
gocache "github.com/patrickmn/go-cache"
|
||||
)
|
||||
|
||||
const (
|
||||
dataSection = "unified/data"
|
||||
// cache
|
||||
groupResourcesCacheKey = "group-resources"
|
||||
)
|
||||
|
||||
// dataStore is a data store that uses a KV store to store data.
|
||||
type dataStore struct {
|
||||
kv KV
|
||||
kv KV
|
||||
cache *gocache.Cache
|
||||
}
|
||||
|
||||
func newDataStore(kv KV) *dataStore {
|
||||
return &dataStore{
|
||||
kv: kv,
|
||||
kv: kv,
|
||||
cache: gocache.New(time.Hour, 10*time.Minute), // 1 hour expiration, 10 minute cleanup
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -37,6 +45,13 @@ type DataKey struct {
|
|||
Name string
|
||||
ResourceVersion int64
|
||||
Action DataAction
|
||||
Folder string
|
||||
}
|
||||
|
||||
// GroupResource represents a unique group/resource combination
|
||||
type GroupResource struct {
|
||||
Group string
|
||||
Resource string
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -46,40 +61,34 @@ var (
|
|||
)
|
||||
|
||||
func (k DataKey) String() string {
|
||||
return fmt.Sprintf("%s/%s/%s/%s/%d~%s", k.Namespace, k.Group, k.Resource, k.Name, k.ResourceVersion, k.Action)
|
||||
return fmt.Sprintf("%s/%s/%s/%s/%d~%s~%s", k.Group, k.Resource, k.Namespace, k.Name, k.ResourceVersion, k.Action, k.Folder)
|
||||
}
|
||||
|
||||
func (k DataKey) Equals(other DataKey) bool {
|
||||
return k.Namespace == other.Namespace && k.Group == other.Group && k.Resource == other.Resource && k.Name == other.Name && k.ResourceVersion == other.ResourceVersion && k.Action == other.Action
|
||||
return k.Group == other.Group && k.Resource == other.Resource && k.Namespace == other.Namespace && k.Name == other.Name && k.ResourceVersion == other.ResourceVersion && k.Action == other.Action && k.Folder == other.Folder
|
||||
}
|
||||
|
||||
func (k DataKey) Validate() error {
|
||||
if k.Namespace == "" {
|
||||
if k.Group != "" || k.Resource != "" || k.Name != "" {
|
||||
return fmt.Errorf("namespace is required when group, resource, or name are provided")
|
||||
}
|
||||
return fmt.Errorf("namespace cannot be empty")
|
||||
}
|
||||
if k.Group == "" {
|
||||
if k.Resource != "" || k.Name != "" {
|
||||
return fmt.Errorf("group is required when resource or name are provided")
|
||||
}
|
||||
return fmt.Errorf("group cannot be empty")
|
||||
return fmt.Errorf("group is required")
|
||||
}
|
||||
if k.Resource == "" {
|
||||
if k.Name != "" {
|
||||
return fmt.Errorf("resource is required when name is provided")
|
||||
}
|
||||
return fmt.Errorf("resource cannot be empty")
|
||||
return fmt.Errorf("resource is required")
|
||||
}
|
||||
if k.Namespace == "" {
|
||||
return fmt.Errorf("namespace is required")
|
||||
}
|
||||
if k.Name == "" {
|
||||
return fmt.Errorf("name cannot be empty")
|
||||
return fmt.Errorf("name is required")
|
||||
}
|
||||
if k.ResourceVersion <= 0 {
|
||||
return fmt.Errorf("resource version must be positive")
|
||||
}
|
||||
if k.Action == "" {
|
||||
return fmt.Errorf("action cannot be empty")
|
||||
return fmt.Errorf("action is required")
|
||||
}
|
||||
|
||||
// Validate each field against the naming rules
|
||||
// Validate naming conventions for all required fields
|
||||
if !validNameRegex.MatchString(k.Namespace) {
|
||||
return fmt.Errorf("namespace '%s' is invalid", k.Namespace)
|
||||
}
|
||||
|
@ -93,6 +102,12 @@ func (k DataKey) Validate() error {
|
|||
return fmt.Errorf("name '%s' is invalid", k.Name)
|
||||
}
|
||||
|
||||
// Validate folder field if provided (optional field)
|
||||
if k.Folder != "" && !validNameRegex.MatchString(k.Folder) {
|
||||
return fmt.Errorf("folder '%s' is invalid", k.Folder)
|
||||
}
|
||||
|
||||
// Validate action is one of the valid values
|
||||
switch k.Action {
|
||||
case DataActionCreated, DataActionUpdated, DataActionDeleted:
|
||||
return nil
|
||||
|
@ -102,47 +117,23 @@ func (k DataKey) Validate() error {
|
|||
}
|
||||
|
||||
type ListRequestKey struct {
|
||||
Namespace string
|
||||
Group string
|
||||
Resource string
|
||||
Name string
|
||||
Sort SortOrder
|
||||
Namespace string
|
||||
Name string // optional for listing multiple resources
|
||||
}
|
||||
|
||||
func (k ListRequestKey) Validate() error {
|
||||
// Check hierarchical validation - if a field is empty, more specific fields should also be empty
|
||||
if k.Namespace == "" {
|
||||
if k.Group != "" || k.Resource != "" || k.Name != "" {
|
||||
return fmt.Errorf("namespace is required when group, resource, or name are provided")
|
||||
}
|
||||
return nil // Empty namespace is allowed for ListRequestKey
|
||||
}
|
||||
if k.Group == "" {
|
||||
if k.Resource != "" || k.Name != "" {
|
||||
return fmt.Errorf("group is required when resource or name are provided")
|
||||
}
|
||||
// Only validate namespace if it's provided
|
||||
if !validNameRegex.MatchString(k.Namespace) {
|
||||
return fmt.Errorf("namespace '%s' is invalid", k.Namespace)
|
||||
}
|
||||
return nil
|
||||
return fmt.Errorf("group is required")
|
||||
}
|
||||
if k.Resource == "" {
|
||||
if k.Name != "" {
|
||||
return fmt.Errorf("resource is required when name is provided")
|
||||
}
|
||||
// Validate namespace and group if they're provided
|
||||
if !validNameRegex.MatchString(k.Namespace) {
|
||||
return fmt.Errorf("namespace '%s' is invalid", k.Namespace)
|
||||
}
|
||||
if !validNameRegex.MatchString(k.Group) {
|
||||
return fmt.Errorf("group '%s' is invalid", k.Group)
|
||||
}
|
||||
return nil
|
||||
return fmt.Errorf("resource is required")
|
||||
}
|
||||
|
||||
// All fields are provided, validate each one
|
||||
if !validNameRegex.MatchString(k.Namespace) {
|
||||
if k.Namespace == "" && k.Name != "" {
|
||||
return fmt.Errorf("name must be empty when namespace is empty")
|
||||
}
|
||||
if k.Namespace != "" && !validNameRegex.MatchString(k.Namespace) {
|
||||
return fmt.Errorf("namespace '%s' is invalid", k.Namespace)
|
||||
}
|
||||
if !validNameRegex.MatchString(k.Group) {
|
||||
|
@ -154,24 +145,62 @@ func (k ListRequestKey) Validate() error {
|
|||
if k.Name != "" && !validNameRegex.MatchString(k.Name) {
|
||||
return fmt.Errorf("name '%s' is invalid", k.Name)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k ListRequestKey) Prefix() string {
|
||||
if k.Namespace == "" {
|
||||
return ""
|
||||
}
|
||||
if k.Group == "" {
|
||||
return fmt.Sprintf("%s/", k.Namespace)
|
||||
}
|
||||
if k.Resource == "" {
|
||||
return fmt.Sprintf("%s/%s/", k.Namespace, k.Group)
|
||||
return fmt.Sprintf("%s/%s/", k.Group, k.Resource)
|
||||
}
|
||||
if k.Name == "" {
|
||||
return fmt.Sprintf("%s/%s/%s/", k.Namespace, k.Group, k.Resource)
|
||||
return fmt.Sprintf("%s/%s/%s/", k.Group, k.Resource, k.Namespace)
|
||||
}
|
||||
return fmt.Sprintf("%s/%s/%s/%s/", k.Namespace, k.Group, k.Resource, k.Name)
|
||||
return fmt.Sprintf("%s/%s/%s/%s/", k.Group, k.Resource, k.Namespace, k.Name)
|
||||
}
|
||||
|
||||
// GetRequestKey is used for getting a specific data object by latest version
|
||||
type GetRequestKey struct {
|
||||
Group string
|
||||
Resource string
|
||||
Namespace string
|
||||
Name string
|
||||
}
|
||||
|
||||
// Validate validates the get request key
|
||||
func (k GetRequestKey) Validate() error {
|
||||
if k.Group == "" {
|
||||
return fmt.Errorf("group is required")
|
||||
}
|
||||
if k.Resource == "" {
|
||||
return fmt.Errorf("resource is required")
|
||||
}
|
||||
if k.Namespace == "" {
|
||||
return fmt.Errorf("namespace is required")
|
||||
}
|
||||
if k.Name == "" {
|
||||
return fmt.Errorf("name is required")
|
||||
}
|
||||
|
||||
// Validate naming conventions
|
||||
if !validNameRegex.MatchString(k.Namespace) {
|
||||
return fmt.Errorf("namespace '%s' is invalid", k.Namespace)
|
||||
}
|
||||
if !validNameRegex.MatchString(k.Group) {
|
||||
return fmt.Errorf("group '%s' is invalid", k.Group)
|
||||
}
|
||||
if !validNameRegex.MatchString(k.Resource) {
|
||||
return fmt.Errorf("resource '%s' is invalid", k.Resource)
|
||||
}
|
||||
if !validNameRegex.MatchString(k.Name) {
|
||||
return fmt.Errorf("name '%s' is invalid", k.Name)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Prefix returns the prefix for getting a specific data object
|
||||
func (k GetRequestKey) Prefix() string {
|
||||
return fmt.Sprintf("%s/%s/%s/%s/", k.Group, k.Resource, k.Namespace, k.Name)
|
||||
}
|
||||
|
||||
type DataAction string
|
||||
|
@ -183,19 +212,18 @@ const (
|
|||
)
|
||||
|
||||
// Keys returns all keys for a given key by iterating through the KV store
|
||||
func (d *dataStore) Keys(ctx context.Context, key ListRequestKey) iter.Seq2[DataKey, error] {
|
||||
func (d *dataStore) Keys(ctx context.Context, key ListRequestKey, sort SortOrder) iter.Seq2[DataKey, error] {
|
||||
if err := key.Validate(); err != nil {
|
||||
return func(yield func(DataKey, error) bool) {
|
||||
yield(DataKey{}, err)
|
||||
}
|
||||
}
|
||||
|
||||
prefix := key.Prefix()
|
||||
return func(yield func(DataKey, error) bool) {
|
||||
for k, err := range d.kv.Keys(ctx, dataSection, ListOptions{
|
||||
StartKey: prefix,
|
||||
EndKey: PrefixRangeEnd(prefix),
|
||||
Sort: key.Sort,
|
||||
Sort: sort,
|
||||
}) {
|
||||
if err != nil {
|
||||
yield(DataKey{}, err)
|
||||
|
@ -236,6 +264,123 @@ func (d *dataStore) LastResourceVersion(ctx context.Context, key ListRequestKey)
|
|||
return DataKey{}, ErrNotFound
|
||||
}
|
||||
|
||||
// GetLatestResourceKey retrieves the data key for the latest version of a resource.
|
||||
// Returns the key with the highest resource version that is not deleted.
|
||||
func (d *dataStore) GetLatestResourceKey(ctx context.Context, key GetRequestKey) (DataKey, error) {
|
||||
return d.GetResourceKeyAtRevision(ctx, key, 0)
|
||||
}
|
||||
|
||||
// GetResourceKeyAtRevision retrieves the data key for a resource at a specific revision.
|
||||
// If rv is 0, it returns the latest version. Returns the highest version <= rv that is not deleted.
|
||||
func (d *dataStore) GetResourceKeyAtRevision(ctx context.Context, key GetRequestKey, rv int64) (DataKey, error) {
|
||||
if err := key.Validate(); err != nil {
|
||||
return DataKey{}, fmt.Errorf("invalid get request key: %w", err)
|
||||
}
|
||||
|
||||
if rv == 0 {
|
||||
rv = math.MaxInt64
|
||||
}
|
||||
|
||||
listKey := ListRequestKey(key)
|
||||
|
||||
iter := d.ListResourceKeysAtRevision(ctx, listKey, rv)
|
||||
for dataKey, err := range iter {
|
||||
if err != nil {
|
||||
return DataKey{}, err
|
||||
}
|
||||
return dataKey, nil
|
||||
}
|
||||
return DataKey{}, ErrNotFound
|
||||
}
|
||||
|
||||
// ListLatestResourceKeys returns an iterator over the data keys for the latest versions of resources.
|
||||
// Only returns keys for resources that are not deleted.
|
||||
func (d *dataStore) ListLatestResourceKeys(ctx context.Context, key ListRequestKey) iter.Seq2[DataKey, error] {
|
||||
return d.ListResourceKeysAtRevision(ctx, key, 0)
|
||||
}
|
||||
|
||||
// ListResourceKeysAtRevision returns an iterator over data keys for resources at a specific revision.
|
||||
// If rv is 0, it returns the latest versions. Only returns keys for resources that are not deleted at the given revision.
|
||||
func (d *dataStore) ListResourceKeysAtRevision(ctx context.Context, key ListRequestKey, rv int64) iter.Seq2[DataKey, error] {
|
||||
if err := key.Validate(); err != nil {
|
||||
return func(yield func(DataKey, error) bool) {
|
||||
yield(DataKey{}, fmt.Errorf("invalid list request key: %w", err))
|
||||
}
|
||||
}
|
||||
|
||||
if rv == 0 {
|
||||
rv = math.MaxInt64
|
||||
}
|
||||
|
||||
prefix := key.Prefix()
|
||||
// List all keys in the prefix.
|
||||
iter := d.kv.Keys(ctx, dataSection, ListOptions{
|
||||
StartKey: prefix,
|
||||
EndKey: PrefixRangeEnd(prefix),
|
||||
Sort: SortOrderAsc,
|
||||
})
|
||||
|
||||
return func(yield func(DataKey, error) bool) {
|
||||
var candidateKey *DataKey // The current candidate key we are iterating over
|
||||
|
||||
// yieldCandidate is a helper function to yield results.
|
||||
// Won't yield if the resource was last deleted.
|
||||
yieldCandidate := func() bool {
|
||||
if candidateKey.Action == DataActionDeleted {
|
||||
// Skip because the resource was last deleted.
|
||||
return true
|
||||
}
|
||||
return yield(*candidateKey, nil)
|
||||
}
|
||||
|
||||
for key, err := range iter {
|
||||
if err != nil {
|
||||
yield(DataKey{}, err)
|
||||
return
|
||||
}
|
||||
|
||||
dataKey, err := ParseKey(key)
|
||||
if err != nil {
|
||||
yield(DataKey{}, err)
|
||||
return
|
||||
}
|
||||
|
||||
if candidateKey == nil {
|
||||
// Skip until we have our first candidate
|
||||
if dataKey.ResourceVersion <= rv {
|
||||
// New candidate found.
|
||||
candidateKey = &dataKey
|
||||
}
|
||||
continue
|
||||
}
|
||||
// Should yield if either:
|
||||
// - We reached the next resource.
|
||||
// - We reached a resource version greater than the target resource version.
|
||||
if !dataKey.SameResource(*candidateKey) || dataKey.ResourceVersion > rv {
|
||||
if !yieldCandidate() {
|
||||
return
|
||||
}
|
||||
// If we moved to a different resource and the resource version matches, make it the new candidate
|
||||
if !dataKey.SameResource(*candidateKey) && dataKey.ResourceVersion <= rv {
|
||||
candidateKey = &dataKey
|
||||
} else {
|
||||
// If we moved to a different resource and the resource version does not match, reset the candidate
|
||||
candidateKey = nil
|
||||
}
|
||||
} else {
|
||||
// Update candidate to the current key (same resource, valid version)
|
||||
candidateKey = &dataKey
|
||||
}
|
||||
}
|
||||
if candidateKey != nil {
|
||||
// Yield the last selected object
|
||||
if !yieldCandidate() {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *dataStore) Get(ctx context.Context, key DataKey) (io.ReadCloser, error) {
|
||||
if err := key.Validate(); err != nil {
|
||||
return nil, fmt.Errorf("invalid data key: %w", err)
|
||||
|
@ -276,20 +421,212 @@ func ParseKey(key string) (DataKey, error) {
|
|||
if len(parts) != 5 {
|
||||
return DataKey{}, fmt.Errorf("invalid key: %s", key)
|
||||
}
|
||||
uidActionParts := strings.Split(parts[4], "~")
|
||||
if len(uidActionParts) != 2 {
|
||||
rvActionFolderParts := strings.Split(parts[4], "~")
|
||||
if len(rvActionFolderParts) != 3 {
|
||||
return DataKey{}, fmt.Errorf("invalid key: %s", key)
|
||||
}
|
||||
rv, err := strconv.ParseInt(uidActionParts[0], 10, 64)
|
||||
rv, err := strconv.ParseInt(rvActionFolderParts[0], 10, 64)
|
||||
if err != nil {
|
||||
return DataKey{}, fmt.Errorf("invalid resource version: %s", uidActionParts[0])
|
||||
return DataKey{}, fmt.Errorf("invalid resource version '%s' in key %s: %w", rvActionFolderParts[0], key, err)
|
||||
}
|
||||
return DataKey{
|
||||
Namespace: parts[0],
|
||||
Group: parts[1],
|
||||
Resource: parts[2],
|
||||
Group: parts[0],
|
||||
Resource: parts[1],
|
||||
Namespace: parts[2],
|
||||
Name: parts[3],
|
||||
ResourceVersion: rv,
|
||||
Action: DataAction(uidActionParts[1]),
|
||||
Action: DataAction(rvActionFolderParts[1]),
|
||||
Folder: rvActionFolderParts[2],
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SameResource checks if this key represents the same resource as another key.
|
||||
// It compares the identifying fields: Group, Resource, Namespace, and Name.
|
||||
// ResourceVersion, Action, and Folder are ignored as they don't identify the resource itself.
|
||||
func (k DataKey) SameResource(other DataKey) bool {
|
||||
return k.Group == other.Group &&
|
||||
k.Resource == other.Resource &&
|
||||
k.Namespace == other.Namespace &&
|
||||
k.Name == other.Name
|
||||
}
|
||||
|
||||
// GetResourceStats returns resource stats within the data store by first discovering
|
||||
// all group/resource combinations, then issuing targeted list operations for each one.
|
||||
// If namespace is provided, only keys matching that namespace are considered.
|
||||
func (d *dataStore) GetResourceStats(ctx context.Context, namespace string, minCount int) ([]ResourceStats, error) {
|
||||
// First, get all unique group/resource combinations in the store
|
||||
groupResources, err := d.getGroupResources(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get group resources: %w", err)
|
||||
}
|
||||
|
||||
var stats []ResourceStats
|
||||
|
||||
// Process each group/resource combination
|
||||
for _, groupResource := range groupResources {
|
||||
groupStats, err := d.processGroupResourceStats(ctx, groupResource, namespace, minCount)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to process stats for %s/%s: %w", groupResource.Group, groupResource.Resource, err)
|
||||
}
|
||||
stats = append(stats, groupStats...)
|
||||
}
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
// processGroupResourceStats processes stats for a specific group/resource combination
|
||||
func (d *dataStore) processGroupResourceStats(ctx context.Context, groupResource GroupResource, namespace string, minCount int) ([]ResourceStats, error) {
|
||||
// Use ListRequestKey to construct the appropriate prefix
|
||||
listKey := ListRequestKey{
|
||||
Group: groupResource.Group,
|
||||
Resource: groupResource.Resource,
|
||||
Namespace: namespace, // Empty string if not specified, which will list all namespaces
|
||||
}
|
||||
|
||||
// Maps to track counts per namespace for this group/resource
|
||||
namespaceCounts := make(map[string]int64) // namespace -> count of existing resources
|
||||
namespaceVersions := make(map[string]int64) // namespace -> latest resource version
|
||||
|
||||
// Track current resource being processed
|
||||
var currentResourceKey string
|
||||
var lastDataKey *DataKey
|
||||
|
||||
// Helper function to process the last seen resource
|
||||
processLastResource := func() {
|
||||
if lastDataKey != nil {
|
||||
// Initialize namespace version if not exists
|
||||
if _, exists := namespaceVersions[lastDataKey.Namespace]; !exists {
|
||||
namespaceVersions[lastDataKey.Namespace] = 0
|
||||
}
|
||||
|
||||
// If resource exists (not deleted), increment the count for this namespace
|
||||
if lastDataKey.Action != DataActionDeleted {
|
||||
namespaceCounts[lastDataKey.Namespace]++
|
||||
}
|
||||
|
||||
// Update to latest resource version seen
|
||||
if lastDataKey.ResourceVersion > namespaceVersions[lastDataKey.Namespace] {
|
||||
namespaceVersions[lastDataKey.Namespace] = lastDataKey.ResourceVersion
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// List all keys using the existing Keys method
|
||||
for dataKey, err := range d.Keys(ctx, listKey, SortOrderAsc) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create unique resource identifier (namespace/group/resource/name)
|
||||
resourceKey := fmt.Sprintf("%s/%s/%s/%s", dataKey.Namespace, dataKey.Group, dataKey.Resource, dataKey.Name)
|
||||
|
||||
// If we've moved to a different resource, process the previous one
|
||||
if currentResourceKey != "" && resourceKey != currentResourceKey {
|
||||
processLastResource()
|
||||
}
|
||||
|
||||
// Update tracking variables for the current resource
|
||||
currentResourceKey = resourceKey
|
||||
lastDataKey = &dataKey
|
||||
}
|
||||
|
||||
// Process the final resource
|
||||
processLastResource()
|
||||
|
||||
// Convert namespace counts to ResourceStats
|
||||
stats := make([]ResourceStats, 0, len(namespaceCounts))
|
||||
for ns, count := range namespaceCounts {
|
||||
// Skip if count is below or equal to minimum
|
||||
if count <= int64(minCount) {
|
||||
continue
|
||||
}
|
||||
|
||||
stats = append(stats, ResourceStats{
|
||||
NamespacedResource: NamespacedResource{
|
||||
Namespace: ns,
|
||||
Group: groupResource.Group,
|
||||
Resource: groupResource.Resource,
|
||||
},
|
||||
Count: count,
|
||||
ResourceVersion: namespaceVersions[ns],
|
||||
})
|
||||
}
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
// getGroupResources returns all unique group/resource combinations in the data store.
|
||||
// It efficiently discovers these by using the key ordering and PrefixRangeEnd to jump
|
||||
// between different group/resource prefixes without iterating through all keys.
|
||||
// Results are cached to improve performance.
|
||||
func (d *dataStore) getGroupResources(ctx context.Context) ([]GroupResource, error) {
|
||||
// Check cache first
|
||||
if cached, found := d.cache.Get(groupResourcesCacheKey); found {
|
||||
if cachedResults, ok := cached.([]GroupResource); ok {
|
||||
return cachedResults, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Cache miss or invalid data, compute the results
|
||||
results := make([]GroupResource, 0)
|
||||
seenGroupResources := make(map[string]bool) // "group/resource" -> seen
|
||||
|
||||
startKey := ""
|
||||
|
||||
for {
|
||||
// List with limit 1 to get the next key
|
||||
var foundKey string
|
||||
|
||||
for key, err := range d.kv.Keys(ctx, dataSection, ListOptions{
|
||||
StartKey: startKey,
|
||||
Limit: 1,
|
||||
Sort: SortOrderAsc,
|
||||
}) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
foundKey = key
|
||||
break // Only process the first (and only) key
|
||||
}
|
||||
|
||||
// If no key found, we're done
|
||||
if foundKey == "" {
|
||||
break
|
||||
}
|
||||
|
||||
// Parse the key to extract group and resource
|
||||
dataKey, err := ParseKey(foundKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse key %s: %w", foundKey, err)
|
||||
}
|
||||
|
||||
// Create the group/resource identifier
|
||||
groupResourceKey := fmt.Sprintf("%s/%s", dataKey.Group, dataKey.Resource)
|
||||
|
||||
// Add to results if we haven't seen this group/resource combination before
|
||||
if !seenGroupResources[groupResourceKey] {
|
||||
seenGroupResources[groupResourceKey] = true
|
||||
//nolint:staticcheck // SA4010: wrongly assumes that this result of append is never used
|
||||
results = append(results, GroupResource{
|
||||
Group: dataKey.Group,
|
||||
Resource: dataKey.Resource,
|
||||
})
|
||||
}
|
||||
|
||||
// Compute the next starting point by finding the end of this group/resource prefix
|
||||
groupResourcePrefix := fmt.Sprintf("%s/%s/", dataKey.Group, dataKey.Resource)
|
||||
nextStartKey := PrefixRangeEnd(groupResourcePrefix)
|
||||
|
||||
// If we've reached the end of the key space, we're done
|
||||
if nextStartKey == "" {
|
||||
break
|
||||
}
|
||||
|
||||
startKey = nextStartKey
|
||||
}
|
||||
|
||||
// Cache the results using the default expiration (1 hour)
|
||||
d.cache.Set(groupResourcesCacheKey, results, gocache.DefaultExpiration)
|
||||
|
||||
return results, nil
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -28,10 +28,11 @@ type EventKey struct {
|
|||
Name string
|
||||
ResourceVersion int64
|
||||
Action DataAction
|
||||
Folder string
|
||||
}
|
||||
|
||||
func (k EventKey) String() string {
|
||||
return fmt.Sprintf("%d~%s~%s~%s~%s~%s", k.ResourceVersion, k.Namespace, k.Group, k.Resource, k.Name, k.Action)
|
||||
return fmt.Sprintf("%d~%s~%s~%s~%s~%s~%s", k.ResourceVersion, k.Namespace, k.Group, k.Resource, k.Name, k.Action, k.Folder)
|
||||
}
|
||||
|
||||
func (k EventKey) Validate() error {
|
||||
|
@ -53,7 +54,9 @@ func (k EventKey) Validate() error {
|
|||
if k.Action == "" {
|
||||
return fmt.Errorf("action cannot be empty")
|
||||
}
|
||||
|
||||
if k.Folder != "" && !validNameRegex.MatchString(k.Folder) {
|
||||
return fmt.Errorf("folder '%s' is invalid", k.Folder)
|
||||
}
|
||||
// Validate each field against the naming rules (reusing the regex from datastore.go)
|
||||
if !validNameRegex.MatchString(k.Namespace) {
|
||||
return fmt.Errorf("namespace '%s' is invalid", k.Namespace)
|
||||
|
@ -67,7 +70,9 @@ func (k EventKey) Validate() error {
|
|||
if !validNameRegex.MatchString(k.Name) {
|
||||
return fmt.Errorf("name '%s' is invalid", k.Name)
|
||||
}
|
||||
|
||||
if k.Folder != "" && !validNameRegex.MatchString(k.Folder) {
|
||||
return fmt.Errorf("folder '%s' is invalid", k.Folder)
|
||||
}
|
||||
switch k.Action {
|
||||
case DataActionCreated, DataActionUpdated, DataActionDeleted:
|
||||
default:
|
||||
|
@ -97,7 +102,7 @@ func newEventStore(kv KV) *eventStore {
|
|||
// ParseEventKey parses a key string back into an EventKey struct
|
||||
func ParseEventKey(key string) (EventKey, error) {
|
||||
parts := strings.Split(key, "~")
|
||||
if len(parts) != 6 {
|
||||
if len(parts) != 7 {
|
||||
return EventKey{}, fmt.Errorf("invalid key format: expected 6 parts, got %d", len(parts))
|
||||
}
|
||||
|
||||
|
@ -113,6 +118,7 @@ func ParseEventKey(key string) (EventKey, error) {
|
|||
Resource: parts[3],
|
||||
Name: parts[4],
|
||||
Action: DataAction(parts[5]),
|
||||
Folder: parts[6],
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -40,8 +40,9 @@ func TestEventKey_String(t *testing.T) {
|
|||
Name: "test-resource",
|
||||
ResourceVersion: 1000,
|
||||
Action: "created",
|
||||
Folder: "test-folder",
|
||||
},
|
||||
expected: "1000~default~apps~resource~test-resource~created",
|
||||
expected: "1000~default~apps~resource~test-resource~created~test-folder",
|
||||
},
|
||||
{
|
||||
name: "empty namespace",
|
||||
|
@ -52,8 +53,9 @@ func TestEventKey_String(t *testing.T) {
|
|||
Name: "test-resource",
|
||||
ResourceVersion: 2000,
|
||||
Action: "updated",
|
||||
Folder: "test-folder",
|
||||
},
|
||||
expected: "2000~~apps~resource~test-resource~updated",
|
||||
expected: "2000~~apps~resource~test-resource~updated~test-folder",
|
||||
},
|
||||
{
|
||||
name: "special characters in name",
|
||||
|
@ -64,8 +66,9 @@ func TestEventKey_String(t *testing.T) {
|
|||
Name: "test-resource-with-dashes",
|
||||
ResourceVersion: 3000,
|
||||
Action: "deleted",
|
||||
Folder: "test-folder",
|
||||
},
|
||||
expected: "3000~test-ns~apps~resource~test-resource-with-dashes~deleted",
|
||||
expected: "3000~test-ns~apps~resource~test-resource-with-dashes~deleted~test-folder",
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -86,7 +89,7 @@ func TestEventKey_Validate(t *testing.T) {
|
|||
}{
|
||||
{
|
||||
name: "valid key",
|
||||
key: "1000~default~apps~resource~test-resource~created",
|
||||
key: "1000~default~apps~resource~test-resource~created~test-folder",
|
||||
expected: EventKey{
|
||||
ResourceVersion: 1000,
|
||||
Namespace: "default",
|
||||
|
@ -94,11 +97,12 @@ func TestEventKey_Validate(t *testing.T) {
|
|||
Resource: "resource",
|
||||
Name: "test-resource",
|
||||
Action: "created",
|
||||
Folder: "test-folder",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "empty namespace",
|
||||
key: "2000~~apps~resource~test-resource~updated",
|
||||
key: "2000~~apps~resource~test-resource~updated~",
|
||||
expected: EventKey{
|
||||
ResourceVersion: 2000,
|
||||
Namespace: "",
|
||||
|
@ -110,7 +114,7 @@ func TestEventKey_Validate(t *testing.T) {
|
|||
},
|
||||
{
|
||||
name: "special characters in name",
|
||||
key: "3000~test-ns~apps~resource~test-resource-with-dashes~updated",
|
||||
key: "3000~test-ns~apps~resource~test-resource-with-dashes~updated~",
|
||||
expected: EventKey{
|
||||
ResourceVersion: 3000,
|
||||
Namespace: "test-ns",
|
||||
|
@ -122,17 +126,17 @@ func TestEventKey_Validate(t *testing.T) {
|
|||
},
|
||||
{
|
||||
name: "invalid key - too few parts",
|
||||
key: "1000~default~apps~resource",
|
||||
key: "1000~default~apps~resource~",
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
name: "invalid key - too many parts",
|
||||
key: "1000~default~apps~resource~test~extra~parts",
|
||||
key: "1000~default~apps~resource~test~extra~parts~",
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
name: "invalid resource version",
|
||||
key: "invalid~default~apps~resource~test~cerated",
|
||||
key: "invalid~default~apps~resource~test~cerated~",
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
|
|
|
@ -1,391 +0,0 @@
|
|||
package resource
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"iter"
|
||||
"math"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
metaSection = "unified/meta"
|
||||
)
|
||||
|
||||
// Metadata store stores search documents for resources in unified storage.
|
||||
// The store keeps track of the latest versions of each resource.
|
||||
type MetaData struct {
|
||||
IndexableDocument
|
||||
}
|
||||
|
||||
type MetaDataKey struct {
|
||||
Namespace string
|
||||
Group string
|
||||
Resource string
|
||||
Name string
|
||||
ResourceVersion int64
|
||||
Folder string
|
||||
Action DataAction
|
||||
}
|
||||
|
||||
// String returns the string representation of the MetaDataKey used as the storage key
|
||||
func (k MetaDataKey) String() string {
|
||||
return fmt.Sprintf("%s/%s/%s/%s/%d~%s~%s", k.Group, k.Resource, k.Namespace, k.Name, k.ResourceVersion, k.Action, k.Folder)
|
||||
}
|
||||
|
||||
// Validate validates that all required fields are present and valid
|
||||
func (k MetaDataKey) Validate() error {
|
||||
if k.Group == "" {
|
||||
return fmt.Errorf("group is required")
|
||||
}
|
||||
if k.Resource == "" {
|
||||
return fmt.Errorf("resource is required")
|
||||
}
|
||||
if k.Namespace == "" {
|
||||
return fmt.Errorf("namespace is required")
|
||||
}
|
||||
if k.Name == "" {
|
||||
return fmt.Errorf("name is required")
|
||||
}
|
||||
if k.ResourceVersion <= 0 {
|
||||
return fmt.Errorf("resource version must be positive")
|
||||
}
|
||||
if k.Action == "" {
|
||||
return fmt.Errorf("action is required")
|
||||
}
|
||||
|
||||
// Validate naming conventions for all required fields
|
||||
if !validNameRegex.MatchString(k.Namespace) {
|
||||
return fmt.Errorf("namespace '%s' is invalid", k.Namespace)
|
||||
}
|
||||
if !validNameRegex.MatchString(k.Group) {
|
||||
return fmt.Errorf("group '%s' is invalid", k.Group)
|
||||
}
|
||||
if !validNameRegex.MatchString(k.Resource) {
|
||||
return fmt.Errorf("resource '%s' is invalid", k.Resource)
|
||||
}
|
||||
if !validNameRegex.MatchString(k.Name) {
|
||||
return fmt.Errorf("name '%s' is invalid", k.Name)
|
||||
}
|
||||
|
||||
// Validate folder field if provided (optional field)
|
||||
if k.Folder != "" && !validNameRegex.MatchString(k.Folder) {
|
||||
return fmt.Errorf("folder '%s' is invalid", k.Folder)
|
||||
}
|
||||
|
||||
// Validate action is one of the valid values
|
||||
switch k.Action {
|
||||
case DataActionCreated, DataActionUpdated, DataActionDeleted:
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("action '%s' is invalid: must be one of 'created', 'updated', or 'deleted'", k.Action)
|
||||
}
|
||||
}
|
||||
|
||||
// MetaListRequestKey is used for listing metadata objects
|
||||
type MetaListRequestKey struct {
|
||||
Namespace string
|
||||
Group string
|
||||
Resource string
|
||||
Name string // optional for listing multiple resources
|
||||
}
|
||||
|
||||
// Validate validates the list request key
|
||||
func (k MetaListRequestKey) Validate() error {
|
||||
if k.Group == "" {
|
||||
return fmt.Errorf("group is required")
|
||||
}
|
||||
if k.Resource == "" {
|
||||
return fmt.Errorf("resource is required")
|
||||
}
|
||||
|
||||
// If namespace is empty, name must also be empty
|
||||
if k.Namespace == "" && k.Name != "" {
|
||||
return fmt.Errorf("name must be empty when namespace is empty")
|
||||
}
|
||||
|
||||
// Validate naming conventions
|
||||
if k.Namespace != "" && !validNameRegex.MatchString(k.Namespace) {
|
||||
return fmt.Errorf("namespace '%s' is invalid", k.Namespace)
|
||||
}
|
||||
if !validNameRegex.MatchString(k.Group) {
|
||||
return fmt.Errorf("group '%s' is invalid", k.Group)
|
||||
}
|
||||
if !validNameRegex.MatchString(k.Resource) {
|
||||
return fmt.Errorf("resource '%s' is invalid", k.Resource)
|
||||
}
|
||||
if k.Name != "" && !validNameRegex.MatchString(k.Name) {
|
||||
return fmt.Errorf("name '%s' is invalid", k.Name)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Prefix returns the prefix for listing metadata objects
|
||||
func (k MetaListRequestKey) Prefix() string {
|
||||
if k.Name == "" {
|
||||
if k.Namespace == "" {
|
||||
return fmt.Sprintf("%s/%s/", k.Group, k.Resource)
|
||||
}
|
||||
return fmt.Sprintf("%s/%s/%s/", k.Group, k.Resource, k.Namespace)
|
||||
}
|
||||
if k.Namespace == "" {
|
||||
return fmt.Sprintf("%s/%s/%s/", k.Group, k.Resource, k.Name)
|
||||
}
|
||||
return fmt.Sprintf("%s/%s/%s/%s/", k.Group, k.Resource, k.Namespace, k.Name)
|
||||
}
|
||||
|
||||
// MetaGetRequestKey is used for getting a specific metadata object by latest version
|
||||
type MetaGetRequestKey struct {
|
||||
Namespace string
|
||||
Group string
|
||||
Resource string
|
||||
Name string
|
||||
}
|
||||
|
||||
// Validate validates the get request key
|
||||
func (k MetaGetRequestKey) Validate() error {
|
||||
if k.Group == "" {
|
||||
return fmt.Errorf("group is required")
|
||||
}
|
||||
if k.Resource == "" {
|
||||
return fmt.Errorf("resource is required")
|
||||
}
|
||||
if k.Namespace == "" {
|
||||
return fmt.Errorf("namespace is required")
|
||||
}
|
||||
if k.Name == "" {
|
||||
return fmt.Errorf("name is required")
|
||||
}
|
||||
|
||||
// Validate naming conventions
|
||||
if !validNameRegex.MatchString(k.Namespace) {
|
||||
return fmt.Errorf("namespace '%s' is invalid", k.Namespace)
|
||||
}
|
||||
if !validNameRegex.MatchString(k.Group) {
|
||||
return fmt.Errorf("group '%s' is invalid", k.Group)
|
||||
}
|
||||
if !validNameRegex.MatchString(k.Resource) {
|
||||
return fmt.Errorf("resource '%s' is invalid", k.Resource)
|
||||
}
|
||||
if !validNameRegex.MatchString(k.Name) {
|
||||
return fmt.Errorf("name '%s' is invalid", k.Name)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Prefix returns the prefix for getting a specific metadata object
|
||||
func (k MetaGetRequestKey) Prefix() string {
|
||||
return fmt.Sprintf("%s/%s/%s/%s/", k.Group, k.Resource, k.Namespace, k.Name)
|
||||
}
|
||||
|
||||
type MetaDataObj struct {
|
||||
Key MetaDataKey
|
||||
Value MetaData
|
||||
}
|
||||
|
||||
type metadataStore struct {
|
||||
kv KV
|
||||
}
|
||||
|
||||
// newMetadataStore creates a new metadata store instance with the given key-value store backend.
|
||||
func newMetadataStore(kv KV) *metadataStore {
|
||||
return &metadataStore{
|
||||
kv: kv,
|
||||
}
|
||||
}
|
||||
|
||||
// Get retrieves the metadata for a specific metadata key.
|
||||
// It validates the key and returns the raw metadata content.
|
||||
func (d *metadataStore) Get(ctx context.Context, key MetaDataKey) (MetaData, error) {
|
||||
if err := key.Validate(); err != nil {
|
||||
return MetaData{}, fmt.Errorf("invalid metadata key: %w", err)
|
||||
}
|
||||
|
||||
reader, err := d.kv.Get(ctx, metaSection, key.String())
|
||||
if err != nil {
|
||||
return MetaData{}, err
|
||||
}
|
||||
defer func() {
|
||||
_ = reader.Close()
|
||||
}()
|
||||
var meta MetaData
|
||||
err = json.NewDecoder(reader).Decode(&meta)
|
||||
return meta, err
|
||||
}
|
||||
|
||||
// GetLatestResourceKey retrieves the metadata key for the latest version of a resource.
|
||||
// Returns the key with the highest resource version that is not deleted.
|
||||
func (d *metadataStore) GetLatestResourceKey(ctx context.Context, key MetaGetRequestKey) (MetaDataKey, error) {
|
||||
return d.GetResourceKeyAtRevision(ctx, key, 0)
|
||||
}
|
||||
|
||||
// GetResourceKeyAtRevision retrieves the metadata key for a resource at a specific revision.
|
||||
// If rv is 0, it returns the latest version. Returns the highest version <= rv that is not deleted.
|
||||
func (d *metadataStore) GetResourceKeyAtRevision(ctx context.Context, key MetaGetRequestKey, rv int64) (MetaDataKey, error) {
|
||||
if err := key.Validate(); err != nil {
|
||||
return MetaDataKey{}, fmt.Errorf("invalid get request key: %w", err)
|
||||
}
|
||||
|
||||
if rv == 0 {
|
||||
rv = math.MaxInt64
|
||||
}
|
||||
|
||||
listKey := MetaListRequestKey(key)
|
||||
|
||||
iter := d.ListResourceKeysAtRevision(ctx, listKey, rv)
|
||||
for metaKey, err := range iter {
|
||||
if err != nil {
|
||||
return MetaDataKey{}, err
|
||||
}
|
||||
return metaKey, nil
|
||||
}
|
||||
return MetaDataKey{}, ErrNotFound
|
||||
}
|
||||
|
||||
// ListLatestResourceKeys returns an iterator over the metadata keys for the latest versions of resources.
|
||||
// Only returns keys for resources that are not deleted.
|
||||
func (d *metadataStore) ListLatestResourceKeys(ctx context.Context, key MetaListRequestKey) iter.Seq2[MetaDataKey, error] {
|
||||
return d.ListResourceKeysAtRevision(ctx, key, 0)
|
||||
}
|
||||
|
||||
// ListResourceKeysAtRevision returns an iterator over metadata keys for resources at a specific revision.
|
||||
// If rv is 0, it returns the latest versions. Only returns keys for resources that are not deleted at the given revision.
|
||||
func (d *metadataStore) ListResourceKeysAtRevision(ctx context.Context, key MetaListRequestKey, rv int64) iter.Seq2[MetaDataKey, error] {
|
||||
if err := key.Validate(); err != nil {
|
||||
return func(yield func(MetaDataKey, error) bool) {
|
||||
yield(MetaDataKey{}, fmt.Errorf("invalid list request key: %w", err))
|
||||
}
|
||||
}
|
||||
|
||||
if rv == 0 {
|
||||
rv = math.MaxInt64
|
||||
}
|
||||
|
||||
prefix := key.Prefix()
|
||||
// List all keys in the prefix.
|
||||
iter := d.kv.Keys(ctx, metaSection, ListOptions{
|
||||
StartKey: prefix,
|
||||
EndKey: PrefixRangeEnd(prefix),
|
||||
Sort: SortOrderAsc,
|
||||
})
|
||||
|
||||
return func(yield func(MetaDataKey, error) bool) {
|
||||
var candidateKey *MetaDataKey // The current candidate key we are iterating over
|
||||
|
||||
// yieldCandidate is a helper function to yield results.
|
||||
// Won't yield if the resource was last deleted.
|
||||
yieldCandidate := func() bool {
|
||||
if candidateKey.Action == DataActionDeleted {
|
||||
// Skip because the resource was last deleted.
|
||||
return true
|
||||
}
|
||||
return yield(*candidateKey, nil)
|
||||
}
|
||||
|
||||
for key, err := range iter {
|
||||
if err != nil {
|
||||
yield(MetaDataKey{}, err)
|
||||
return
|
||||
}
|
||||
|
||||
metaKey, err := parseMetaDataKey(key)
|
||||
if err != nil {
|
||||
yield(MetaDataKey{}, err)
|
||||
return
|
||||
}
|
||||
|
||||
if candidateKey == nil {
|
||||
// Skip until we have our first candidate
|
||||
if metaKey.ResourceVersion <= rv {
|
||||
// New candidate found.
|
||||
candidateKey = &metaKey
|
||||
}
|
||||
continue
|
||||
}
|
||||
// Should yield if either:
|
||||
// - We reached the next resource.
|
||||
// - We reached a resource version greater than the target resource version.
|
||||
if !metaKey.SameResource(*candidateKey) || metaKey.ResourceVersion > rv {
|
||||
if !yieldCandidate() {
|
||||
return
|
||||
}
|
||||
// If we moved to a different resource and the resource version matches, make it the new candidate
|
||||
if !metaKey.SameResource(*candidateKey) && metaKey.ResourceVersion <= rv {
|
||||
candidateKey = &metaKey
|
||||
} else {
|
||||
// If we moved to a different resource and the resource version does not match, reset the candidate
|
||||
candidateKey = nil
|
||||
}
|
||||
} else {
|
||||
// Update candidate to the current key (same resource, valid version)
|
||||
candidateKey = &metaKey
|
||||
}
|
||||
}
|
||||
if candidateKey != nil {
|
||||
// Yield the last selected object
|
||||
if !yieldCandidate() {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Save stores a metadata object in the store.
|
||||
func (d *metadataStore) Save(ctx context.Context, obj MetaDataObj) error {
|
||||
if err := obj.Key.Validate(); err != nil {
|
||||
return fmt.Errorf("invalid metadata key: %w", err)
|
||||
}
|
||||
|
||||
writer, err := d.kv.Save(ctx, metaSection, obj.Key.String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
encoder := json.NewEncoder(writer)
|
||||
if err := encoder.Encode(obj.Value); err != nil {
|
||||
_ = writer.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
return writer.Close()
|
||||
}
|
||||
|
||||
// parseMetaDataKey parses a string key into a MetaDataKey struct
|
||||
func parseMetaDataKey(key string) (MetaDataKey, error) {
|
||||
parts := strings.Split(key, "/")
|
||||
if len(parts) != 5 {
|
||||
return MetaDataKey{}, fmt.Errorf("invalid key: %s", key)
|
||||
}
|
||||
|
||||
rvActionFolderParts := strings.Split(parts[4], "~")
|
||||
if len(rvActionFolderParts) != 3 {
|
||||
return MetaDataKey{}, fmt.Errorf("invalid key: %s", key)
|
||||
}
|
||||
|
||||
rv, err := strconv.ParseInt(rvActionFolderParts[0], 10, 64)
|
||||
if err != nil {
|
||||
return MetaDataKey{}, fmt.Errorf("invalid resource version '%s' in key %s: %w", rvActionFolderParts[0], key, err)
|
||||
}
|
||||
return MetaDataKey{
|
||||
Namespace: parts[2],
|
||||
Group: parts[0],
|
||||
Resource: parts[1],
|
||||
Name: parts[3],
|
||||
ResourceVersion: rv,
|
||||
Action: DataAction(rvActionFolderParts[1]),
|
||||
Folder: rvActionFolderParts[2],
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SameResource checks if this key represents the same resource as another key.
|
||||
// It compares the identifying fields: Namespace, Group, Resource, and Name.
|
||||
// ResourceVersion, Action, and Folder are ignored as they don't identify the resource itself.
|
||||
func (k MetaDataKey) SameResource(other MetaDataKey) bool {
|
||||
return k.Namespace == other.Namespace &&
|
||||
k.Group == other.Group &&
|
||||
k.Resource == other.Resource &&
|
||||
k.Name == other.Name
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -11,7 +11,6 @@ import (
|
|||
"math/rand/v2"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/bwmarrin/snowflake"
|
||||
|
@ -36,7 +35,6 @@ type kvStorageBackend struct {
|
|||
snowflake *snowflake.Node
|
||||
kv KV
|
||||
dataStore *dataStore
|
||||
metaStore *metadataStore
|
||||
eventStore *eventStore
|
||||
notifier *notifier
|
||||
builder DocumentBuilder
|
||||
|
@ -83,7 +81,6 @@ func NewKvStorageBackend(opts KvBackendOptions) (StorageBackend, error) {
|
|||
backend := &kvStorageBackend{
|
||||
kv: kv,
|
||||
dataStore: newDataStore(kv),
|
||||
metaStore: newMetadataStore(kv),
|
||||
eventStore: eventStore,
|
||||
notifier: newNotifier(eventStore, notifierOptions{}),
|
||||
snowflake: s,
|
||||
|
@ -139,16 +136,14 @@ func (k *kvStorageBackend) pruneEvents(ctx context.Context, key PruningKey) erro
|
|||
return fmt.Errorf("invalid pruning key, all fields must be set: %+v", key)
|
||||
}
|
||||
|
||||
listKey := ListRequestKey{
|
||||
counter := 0
|
||||
// iterate over all keys for the resource and delete versions beyond the latest 20
|
||||
for datakey, err := range k.dataStore.Keys(ctx, ListRequestKey{
|
||||
Namespace: key.Namespace,
|
||||
Group: key.Group,
|
||||
Resource: key.Resource,
|
||||
Name: key.Name,
|
||||
Sort: SortOrderDesc,
|
||||
}
|
||||
counter := 0
|
||||
// iterate over all keys for the resource and delete versions beyond the latest 20
|
||||
for datakey, err := range k.dataStore.Keys(ctx, listKey) {
|
||||
}, SortOrderDesc) {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -217,10 +212,10 @@ func (k *kvStorageBackend) WriteEvent(ctx context.Context, event WriteEvent) (in
|
|||
case resourcepb.WatchEvent_ADDED:
|
||||
action = DataActionCreated
|
||||
// Check if resource already exists for create operations
|
||||
_, err := k.metaStore.GetLatestResourceKey(ctx, MetaGetRequestKey{
|
||||
Namespace: event.Key.Namespace,
|
||||
_, err := k.dataStore.GetLatestResourceKey(ctx, GetRequestKey{
|
||||
Group: event.Key.Group,
|
||||
Resource: event.Key.Resource,
|
||||
Namespace: event.Key.Namespace,
|
||||
Name: event.Key.Name,
|
||||
})
|
||||
if err == nil {
|
||||
|
@ -244,44 +239,20 @@ func (k *kvStorageBackend) WriteEvent(ctx context.Context, event WriteEvent) (in
|
|||
return 0, fmt.Errorf("object is nil")
|
||||
}
|
||||
|
||||
// Build the search document
|
||||
doc, err := k.builder.BuildDocument(ctx, event.Key, rv, event.Value)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to build document: %w", err)
|
||||
}
|
||||
|
||||
// Write the data
|
||||
err = k.dataStore.Save(ctx, DataKey{
|
||||
Namespace: event.Key.Namespace,
|
||||
err := k.dataStore.Save(ctx, DataKey{
|
||||
Group: event.Key.Group,
|
||||
Resource: event.Key.Resource,
|
||||
Namespace: event.Key.Namespace,
|
||||
Name: event.Key.Name,
|
||||
ResourceVersion: rv,
|
||||
Action: action,
|
||||
Folder: obj.GetFolder(),
|
||||
}, bytes.NewReader(event.Value))
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to write data: %w", err)
|
||||
}
|
||||
|
||||
// Write metadata
|
||||
err = k.metaStore.Save(ctx, MetaDataObj{
|
||||
Key: MetaDataKey{
|
||||
Namespace: event.Key.Namespace,
|
||||
Group: event.Key.Group,
|
||||
Resource: event.Key.Resource,
|
||||
Name: event.Key.Name,
|
||||
ResourceVersion: rv,
|
||||
Action: action,
|
||||
Folder: obj.GetFolder(),
|
||||
},
|
||||
Value: MetaData{
|
||||
IndexableDocument: *doc,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to write metadata: %w", err)
|
||||
}
|
||||
|
||||
// Write event
|
||||
err = k.eventStore.Save(ctx, Event{
|
||||
Namespace: event.Key.Namespace,
|
||||
|
@ -311,10 +282,10 @@ func (k *kvStorageBackend) ReadResource(ctx context.Context, req *resourcepb.Rea
|
|||
if req.Key == nil {
|
||||
return &BackendReadResponse{Error: &resourcepb.ErrorResult{Code: http.StatusBadRequest, Message: "missing key"}}
|
||||
}
|
||||
meta, err := k.metaStore.GetResourceKeyAtRevision(ctx, MetaGetRequestKey{
|
||||
Namespace: req.Key.Namespace,
|
||||
meta, err := k.dataStore.GetResourceKeyAtRevision(ctx, GetRequestKey{
|
||||
Group: req.Key.Group,
|
||||
Resource: req.Key.Resource,
|
||||
Namespace: req.Key.Namespace,
|
||||
Name: req.Key.Name,
|
||||
}, req.ResourceVersion)
|
||||
if errors.Is(err, ErrNotFound) {
|
||||
|
@ -323,12 +294,13 @@ func (k *kvStorageBackend) ReadResource(ctx context.Context, req *resourcepb.Rea
|
|||
return &BackendReadResponse{Error: &resourcepb.ErrorResult{Code: http.StatusInternalServerError, Message: err.Error()}}
|
||||
}
|
||||
data, err := k.dataStore.Get(ctx, DataKey{
|
||||
Namespace: req.Key.Namespace,
|
||||
Group: req.Key.Group,
|
||||
Resource: req.Key.Resource,
|
||||
Namespace: req.Key.Namespace,
|
||||
Name: req.Key.Name,
|
||||
ResourceVersion: meta.ResourceVersion,
|
||||
Action: meta.Action,
|
||||
Folder: meta.Folder,
|
||||
})
|
||||
if err != nil || data == nil {
|
||||
return &BackendReadResponse{Error: &resourcepb.ErrorResult{Code: http.StatusInternalServerError, Message: err.Error()}}
|
||||
|
@ -369,12 +341,12 @@ func (k *kvStorageBackend) ListIterator(ctx context.Context, req *resourcepb.Lis
|
|||
}
|
||||
|
||||
// Fetch the latest objects
|
||||
keys := make([]MetaDataKey, 0, min(defaultListBufferSize, req.Limit+1))
|
||||
keys := make([]DataKey, 0, min(defaultListBufferSize, req.Limit+1))
|
||||
idx := 0
|
||||
for metaKey, err := range k.metaStore.ListResourceKeysAtRevision(ctx, MetaListRequestKey{
|
||||
Namespace: req.Options.Key.Namespace,
|
||||
for dataKey, err := range k.dataStore.ListResourceKeysAtRevision(ctx, ListRequestKey{
|
||||
Group: req.Options.Key.Group,
|
||||
Resource: req.Options.Key.Resource,
|
||||
Namespace: req.Options.Key.Namespace,
|
||||
Name: req.Options.Key.Name,
|
||||
}, resourceVersion) {
|
||||
if err != nil {
|
||||
|
@ -385,7 +357,7 @@ func (k *kvStorageBackend) ListIterator(ctx context.Context, req *resourcepb.Lis
|
|||
idx++
|
||||
continue
|
||||
}
|
||||
keys = append(keys, metaKey)
|
||||
keys = append(keys, dataKey)
|
||||
// Only fetch the first limit items + 1 to get the next token.
|
||||
if len(keys) >= int(req.Limit+1) {
|
||||
break
|
||||
|
@ -411,7 +383,7 @@ func (k *kvStorageBackend) ListIterator(ctx context.Context, req *resourcepb.Lis
|
|||
// kvListIterator implements ListIterator for KV storage
|
||||
type kvListIterator struct {
|
||||
ctx context.Context
|
||||
keys []MetaDataKey
|
||||
keys []DataKey
|
||||
currentIndex int
|
||||
dataStore *dataStore
|
||||
listRV int64
|
||||
|
@ -437,14 +409,7 @@ func (i *kvListIterator) Next() bool {
|
|||
|
||||
i.rv, i.err = i.keys[i.currentIndex].ResourceVersion, nil
|
||||
|
||||
data, err := i.dataStore.Get(i.ctx, DataKey{
|
||||
Namespace: i.keys[i.currentIndex].Namespace,
|
||||
Group: i.keys[i.currentIndex].Group,
|
||||
Resource: i.keys[i.currentIndex].Resource,
|
||||
Name: i.keys[i.currentIndex].Name,
|
||||
ResourceVersion: i.keys[i.currentIndex].ResourceVersion,
|
||||
Action: i.keys[i.currentIndex].Action,
|
||||
})
|
||||
data, err := i.dataStore.Get(i.ctx, i.keys[i.currentIndex])
|
||||
if err != nil {
|
||||
i.err = err
|
||||
return false
|
||||
|
@ -519,7 +484,7 @@ func filterHistoryKeysByVersion(historyKeys []DataKey, req *resourcepb.ListReque
|
|||
if req.ResourceVersion <= 0 {
|
||||
return nil, fmt.Errorf("expecting an explicit resource version query when using Exact matching")
|
||||
}
|
||||
var exactKeys []DataKey
|
||||
exactKeys := make([]DataKey, 0, len(historyKeys))
|
||||
for _, key := range historyKeys {
|
||||
if key.ResourceVersion == req.ResourceVersion {
|
||||
exactKeys = append(exactKeys, key)
|
||||
|
@ -528,7 +493,7 @@ func filterHistoryKeysByVersion(historyKeys []DataKey, req *resourcepb.ListReque
|
|||
return exactKeys, nil
|
||||
case resourcepb.ResourceVersionMatchV2_NotOlderThan:
|
||||
if req.ResourceVersion > 0 {
|
||||
var filteredKeys []DataKey
|
||||
filteredKeys := make([]DataKey, 0, len(historyKeys))
|
||||
for _, key := range historyKeys {
|
||||
if key.ResourceVersion >= req.ResourceVersion {
|
||||
filteredKeys = append(filteredKeys, key)
|
||||
|
@ -538,7 +503,7 @@ func filterHistoryKeysByVersion(historyKeys []DataKey, req *resourcepb.ListReque
|
|||
}
|
||||
default:
|
||||
if req.ResourceVersion > 0 {
|
||||
var filteredKeys []DataKey
|
||||
filteredKeys := make([]DataKey, 0, len(historyKeys))
|
||||
for _, key := range historyKeys {
|
||||
if key.ResourceVersion <= req.ResourceVersion {
|
||||
filteredKeys = append(filteredKeys, key)
|
||||
|
@ -564,7 +529,7 @@ func applyLiveHistoryFilter(filteredKeys []DataKey, req *resourcepb.ListRequest)
|
|||
}
|
||||
}
|
||||
if latestDeleteRV > 0 {
|
||||
var liveKeys []DataKey
|
||||
liveKeys := make([]DataKey, 0, len(filteredKeys))
|
||||
for _, key := range filteredKeys {
|
||||
if key.ResourceVersion > latestDeleteRV {
|
||||
liveKeys = append(liveKeys, key)
|
||||
|
@ -594,7 +559,7 @@ func applyPagination(keys []DataKey, lastSeenRV int64, sortAscending bool) []Dat
|
|||
return keys
|
||||
}
|
||||
|
||||
var pagedKeys []DataKey
|
||||
pagedKeys := make([]DataKey, 0, len(keys))
|
||||
for _, key := range keys {
|
||||
if sortAscending && key.ResourceVersion > lastSeenRV {
|
||||
pagedKeys = append(pagedKeys, key)
|
||||
|
@ -666,7 +631,7 @@ func (k *kvStorageBackend) listModifiedSinceDataStore(ctx context.Context, key N
|
|||
return func(yield func(*ModifiedResource, error) bool) {
|
||||
var lastSeenResource *ModifiedResource
|
||||
var lastSeenDataKey DataKey
|
||||
for dataKey, err := range k.dataStore.Keys(ctx, ListRequestKey{Namespace: key.Namespace, Group: key.Group, Resource: key.Resource}) {
|
||||
for dataKey, err := range k.dataStore.Keys(ctx, ListRequestKey{Namespace: key.Namespace, Group: key.Group, Resource: key.Resource}, SortOrderAsc) {
|
||||
if err != nil {
|
||||
yield(&ModifiedResource{}, err)
|
||||
return
|
||||
|
@ -767,7 +732,14 @@ func (k *kvStorageBackend) listModifiedSinceEventStore(ctx context.Context, key
|
|||
}
|
||||
seen[evtKey.Name] = struct{}{}
|
||||
|
||||
value, err := k.getValueFromDataStore(ctx, DataKey(evtKey))
|
||||
value, err := k.getValueFromDataStore(ctx, DataKey{
|
||||
Group: evtKey.Group,
|
||||
Resource: evtKey.Resource,
|
||||
Namespace: evtKey.Namespace,
|
||||
Name: evtKey.Name,
|
||||
ResourceVersion: evtKey.ResourceVersion,
|
||||
Action: evtKey.Action,
|
||||
})
|
||||
if err != nil {
|
||||
yield(&ModifiedResource{}, err)
|
||||
return
|
||||
|
@ -820,7 +792,7 @@ func (k *kvStorageBackend) ListHistory(ctx context.Context, req *resourcepb.List
|
|||
Group: key.Group,
|
||||
Resource: key.Resource,
|
||||
Name: key.Name,
|
||||
}) {
|
||||
}, SortOrderAsc) {
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
@ -872,7 +844,7 @@ func (k *kvStorageBackend) ListHistory(ctx context.Context, req *resourcepb.List
|
|||
// processTrashEntries handles the special case of listing deleted items (trash)
|
||||
func (k *kvStorageBackend) processTrashEntries(ctx context.Context, req *resourcepb.ListRequest, fn func(ListIterator) error, historyKeys []DataKey, lastSeenRV int64, sortAscending bool, listRV int64) (int64, error) {
|
||||
// Filter to only deleted entries
|
||||
var deletedKeys []DataKey
|
||||
deletedKeys := make([]DataKey, 0, len(historyKeys))
|
||||
for _, key := range historyKeys {
|
||||
if key.Action == DataActionDeleted {
|
||||
deletedKeys = append(deletedKeys, key)
|
||||
|
@ -881,14 +853,14 @@ func (k *kvStorageBackend) processTrashEntries(ctx context.Context, req *resourc
|
|||
|
||||
// Check if the resource currently exists (is live)
|
||||
// If it exists, don't return any trash entries
|
||||
_, err := k.metaStore.GetLatestResourceKey(ctx, MetaGetRequestKey{
|
||||
Namespace: req.Options.Key.Namespace,
|
||||
_, err := k.dataStore.GetLatestResourceKey(ctx, GetRequestKey{
|
||||
Group: req.Options.Key.Group,
|
||||
Resource: req.Options.Key.Resource,
|
||||
Namespace: req.Options.Key.Namespace,
|
||||
Name: req.Options.Key.Name,
|
||||
})
|
||||
|
||||
var trashKeys []DataKey
|
||||
trashKeys := make([]DataKey, 0, 1)
|
||||
if errors.Is(err, ErrNotFound) {
|
||||
// Resource doesn't exist currently, so we can return the latest delete
|
||||
// Find the latest delete event
|
||||
|
@ -1045,12 +1017,13 @@ func (k *kvStorageBackend) WatchWriteEvents(ctx context.Context) (<-chan *Writte
|
|||
for event := range notifierEvents {
|
||||
// fetch the data
|
||||
dataReader, err := k.dataStore.Get(ctx, DataKey{
|
||||
Namespace: event.Namespace,
|
||||
Group: event.Group,
|
||||
Resource: event.Resource,
|
||||
Namespace: event.Namespace,
|
||||
Name: event.Name,
|
||||
ResourceVersion: event.ResourceVersion,
|
||||
Action: event.Action,
|
||||
Folder: event.Folder,
|
||||
})
|
||||
if err != nil || dataReader == nil {
|
||||
k.log.Error("failed to get data for event", "error", err)
|
||||
|
@ -1092,48 +1065,8 @@ func (k *kvStorageBackend) WatchWriteEvents(ctx context.Context) (<-chan *Writte
|
|||
}
|
||||
|
||||
// GetResourceStats returns resource stats within the storage backend.
|
||||
// TODO: this isn't very efficient, we should use a more efficient algorithm.
|
||||
func (k *kvStorageBackend) GetResourceStats(ctx context.Context, namespace string, minCount int) ([]ResourceStats, error) {
|
||||
stats := make([]ResourceStats, 0)
|
||||
res := make(map[string]map[string]bool)
|
||||
rvs := make(map[string]int64)
|
||||
|
||||
// Use datastore.Keys to get all data keys for the namespace
|
||||
for dataKey, err := range k.dataStore.Keys(ctx, ListRequestKey{Namespace: namespace}) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
key := fmt.Sprintf("%s/%s/%s", dataKey.Namespace, dataKey.Group, dataKey.Resource)
|
||||
if _, ok := res[key]; !ok {
|
||||
res[key] = make(map[string]bool)
|
||||
rvs[key] = 1
|
||||
}
|
||||
res[key][dataKey.Name] = dataKey.Action != DataActionDeleted
|
||||
rvs[key] = dataKey.ResourceVersion
|
||||
}
|
||||
|
||||
for key, names := range res {
|
||||
parts := strings.Split(key, "/")
|
||||
count := int64(0)
|
||||
for _, exists := range names {
|
||||
if exists {
|
||||
count++
|
||||
}
|
||||
}
|
||||
if count <= int64(minCount) {
|
||||
continue
|
||||
}
|
||||
stats = append(stats, ResourceStats{
|
||||
NamespacedResource: NamespacedResource{
|
||||
Namespace: parts[0],
|
||||
Group: parts[1],
|
||||
Resource: parts[2],
|
||||
},
|
||||
Count: count,
|
||||
ResourceVersion: rvs[key],
|
||||
})
|
||||
}
|
||||
return stats, nil
|
||||
return k.dataStore.GetResourceStats(ctx, namespace, minCount)
|
||||
}
|
||||
|
||||
// readAndClose reads all data from a ReadCloser and ensures it's closed,
|
||||
|
|
|
@ -43,7 +43,7 @@ func TestNewKvStorageBackend(t *testing.T) {
|
|||
assert.NotNil(t, backend)
|
||||
assert.NotNil(t, backend.kv)
|
||||
assert.NotNil(t, backend.dataStore)
|
||||
assert.NotNil(t, backend.metaStore)
|
||||
|
||||
assert.NotNil(t, backend.eventStore)
|
||||
assert.NotNil(t, backend.notifier)
|
||||
assert.NotNil(t, backend.snowflake)
|
||||
|
@ -126,25 +126,6 @@ func TestKvStorageBackend_WriteEvent_Success(t *testing.T) {
|
|||
require.NoError(t, dataReader.Close())
|
||||
assert.Equal(t, objectToJSONBytes(t, testObj), dataValue)
|
||||
|
||||
// Verify metadata was written to metaStore
|
||||
metaKey := MetaDataKey{
|
||||
Namespace: "default",
|
||||
Group: "apps",
|
||||
Resource: "resources",
|
||||
Name: "test-resource",
|
||||
ResourceVersion: rv,
|
||||
Action: expectedAction,
|
||||
Folder: "",
|
||||
}
|
||||
|
||||
m, err := backend.metaStore.Get(ctx, metaKey)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, m)
|
||||
require.Equal(t, "test-resource", m.Key.Name)
|
||||
require.Equal(t, "default", m.Key.Namespace)
|
||||
require.Equal(t, "apps", m.Key.Group)
|
||||
require.Equal(t, "resources", m.Key.Resource)
|
||||
|
||||
// Verify event was written to eventStore
|
||||
eventKey := EventKey{
|
||||
Namespace: "default",
|
||||
|
@ -1258,8 +1239,7 @@ func TestKvStorageBackend_PruneEvents(t *testing.T) {
|
|||
Group: "apps",
|
||||
Resource: "resources",
|
||||
Name: "test-resource",
|
||||
Sort: SortOrderDesc,
|
||||
}) {
|
||||
}, SortOrderDesc) {
|
||||
require.NoError(t, err)
|
||||
require.NotEqual(t, rv1, datakey.ResourceVersion)
|
||||
counter++
|
||||
|
@ -1325,8 +1305,7 @@ func TestKvStorageBackend_PruneEvents(t *testing.T) {
|
|||
Group: "apps",
|
||||
Resource: "resources",
|
||||
Name: "test-resource",
|
||||
Sort: SortOrderDesc,
|
||||
}) {
|
||||
}, SortOrderDesc) {
|
||||
require.NoError(t, err)
|
||||
counter++
|
||||
}
|
||||
|
@ -1393,8 +1372,7 @@ func TestKvStorageBackend_PruneEvents(t *testing.T) {
|
|||
Group: "apps",
|
||||
Resource: "resources",
|
||||
Name: "test-resource",
|
||||
Sort: SortOrderDesc,
|
||||
}) {
|
||||
}, SortOrderDesc) {
|
||||
require.NoError(t, err)
|
||||
counter++
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue