mirror of https://github.com/grafana/grafana.git
unified-storage: add ListSinceModified to kv store (#110250)
* implement ListKeysSince in event store implement data store version --------- Co-authored-by: Georges Chaudy <chaudyg@gmail.com>
This commit is contained in:
parent
eed8d189ac
commit
ea7c370edd
|
@ -24,10 +24,11 @@ type EventKey struct {
|
|||
Resource string
|
||||
Name string
|
||||
ResourceVersion int64
|
||||
Action DataAction
|
||||
}
|
||||
|
||||
func (k EventKey) String() string {
|
||||
return fmt.Sprintf("%d~%s~%s~%s~%s", k.ResourceVersion, k.Namespace, k.Group, k.Resource, k.Name)
|
||||
return fmt.Sprintf("%d~%s~%s~%s~%s~%s", k.ResourceVersion, k.Namespace, k.Group, k.Resource, k.Name, k.Action)
|
||||
}
|
||||
|
||||
func (k EventKey) Validate() error {
|
||||
|
@ -46,6 +47,9 @@ func (k EventKey) Validate() error {
|
|||
if k.ResourceVersion < 0 {
|
||||
return fmt.Errorf("resource version must be non-negative")
|
||||
}
|
||||
if k.Action == "" {
|
||||
return fmt.Errorf("action cannot be empty")
|
||||
}
|
||||
|
||||
// Validate each field against the naming rules (reusing the regex from datastore.go)
|
||||
if !validNameRegex.MatchString(k.Namespace) {
|
||||
|
@ -61,6 +65,12 @@ func (k EventKey) Validate() error {
|
|||
return fmt.Errorf("name '%s' is invalid", k.Name)
|
||||
}
|
||||
|
||||
switch k.Action {
|
||||
case DataActionCreated, DataActionUpdated, DataActionDeleted:
|
||||
default:
|
||||
return fmt.Errorf("action '%s' is invalid: must be one of 'created', 'updated', or 'deleted'", k.Action)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -84,8 +94,8 @@ func newEventStore(kv KV) *eventStore {
|
|||
// ParseEventKey parses a key string back into an EventKey struct
|
||||
func ParseEventKey(key string) (EventKey, error) {
|
||||
parts := strings.Split(key, "~")
|
||||
if len(parts) != 5 {
|
||||
return EventKey{}, fmt.Errorf("invalid key format: expected 5 parts, got %d", len(parts))
|
||||
if len(parts) != 6 {
|
||||
return EventKey{}, fmt.Errorf("invalid key format: expected 6 parts, got %d", len(parts))
|
||||
}
|
||||
|
||||
rv, err := strconv.ParseInt(parts[0], 10, 64)
|
||||
|
@ -99,6 +109,7 @@ func ParseEventKey(key string) (EventKey, error) {
|
|||
Group: parts[2],
|
||||
Resource: parts[3],
|
||||
Name: parts[4],
|
||||
Action: DataAction(parts[5]),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -127,6 +138,7 @@ func (n *eventStore) Save(ctx context.Context, event Event) error {
|
|||
Resource: event.Resource,
|
||||
Name: event.Name,
|
||||
ResourceVersion: event.ResourceVersion,
|
||||
Action: event.Action,
|
||||
}
|
||||
|
||||
if err := eventKey.Validate(); err != nil {
|
||||
|
@ -164,27 +176,47 @@ func (n *eventStore) Get(ctx context.Context, key EventKey) (Event, error) {
|
|||
}
|
||||
|
||||
// ListSince returns a sequence of events since the given resource version.
|
||||
func (n *eventStore) ListSince(ctx context.Context, sinceRV int64) iter.Seq2[Event, error] {
|
||||
func (n *eventStore) ListKeysSince(ctx context.Context, sinceRV int64) iter.Seq2[string, error] {
|
||||
opts := ListOptions{
|
||||
Sort: SortOrderAsc,
|
||||
StartKey: EventKey{
|
||||
ResourceVersion: sinceRV,
|
||||
}.String(),
|
||||
}
|
||||
return func(yield func(string, error) bool) {
|
||||
for evtKey, err := range n.kv.Keys(ctx, eventsSection, opts) {
|
||||
if err != nil {
|
||||
yield("", err)
|
||||
return
|
||||
}
|
||||
if !yield(evtKey, nil) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (n *eventStore) ListSince(ctx context.Context, sinceRV int64) iter.Seq2[Event, error] {
|
||||
return func(yield func(Event, error) bool) {
|
||||
for key, err := range n.kv.Keys(ctx, eventsSection, opts) {
|
||||
for evtKey, err := range n.ListKeysSince(ctx, sinceRV) {
|
||||
if err != nil {
|
||||
yield(Event{}, err)
|
||||
return
|
||||
}
|
||||
reader, err := n.kv.Get(ctx, eventsSection, key)
|
||||
|
||||
reader, err := n.kv.Get(ctx, eventsSection, evtKey)
|
||||
if err != nil {
|
||||
yield(Event{}, err)
|
||||
return
|
||||
}
|
||||
|
||||
var event Event
|
||||
if err := json.NewDecoder(reader).Decode(&event); err != nil {
|
||||
_ = reader.Close()
|
||||
yield(Event{}, err)
|
||||
return
|
||||
}
|
||||
|
||||
_ = reader.Close()
|
||||
if !yield(event, nil) {
|
||||
return
|
||||
|
|
|
@ -38,8 +38,9 @@ func TestEventKey_String(t *testing.T) {
|
|||
Resource: "resource",
|
||||
Name: "test-resource",
|
||||
ResourceVersion: 1000,
|
||||
Action: "created",
|
||||
},
|
||||
expected: "1000~default~apps~resource~test-resource",
|
||||
expected: "1000~default~apps~resource~test-resource~created",
|
||||
},
|
||||
{
|
||||
name: "empty namespace",
|
||||
|
@ -49,8 +50,9 @@ func TestEventKey_String(t *testing.T) {
|
|||
Resource: "resource",
|
||||
Name: "test-resource",
|
||||
ResourceVersion: 2000,
|
||||
Action: "updated",
|
||||
},
|
||||
expected: "2000~~apps~resource~test-resource",
|
||||
expected: "2000~~apps~resource~test-resource~updated",
|
||||
},
|
||||
{
|
||||
name: "special characters in name",
|
||||
|
@ -60,8 +62,9 @@ func TestEventKey_String(t *testing.T) {
|
|||
Resource: "resource",
|
||||
Name: "test-resource-with-dashes",
|
||||
ResourceVersion: 3000,
|
||||
Action: "deleted",
|
||||
},
|
||||
expected: "3000~test-ns~apps~resource~test-resource-with-dashes",
|
||||
expected: "3000~test-ns~apps~resource~test-resource-with-dashes~deleted",
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -82,35 +85,38 @@ func TestEventKey_Validate(t *testing.T) {
|
|||
}{
|
||||
{
|
||||
name: "valid key",
|
||||
key: "1000~default~apps~resource~test-resource",
|
||||
key: "1000~default~apps~resource~test-resource~created",
|
||||
expected: EventKey{
|
||||
ResourceVersion: 1000,
|
||||
Namespace: "default",
|
||||
Group: "apps",
|
||||
Resource: "resource",
|
||||
Name: "test-resource",
|
||||
Action: "created",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "empty namespace",
|
||||
key: "2000~~apps~resource~test-resource",
|
||||
key: "2000~~apps~resource~test-resource~updated",
|
||||
expected: EventKey{
|
||||
ResourceVersion: 2000,
|
||||
Namespace: "",
|
||||
Group: "apps",
|
||||
Resource: "resource",
|
||||
Name: "test-resource",
|
||||
Action: "updated",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "special characters in name",
|
||||
key: "3000~test-ns~apps~resource~test-resource-with-dashes",
|
||||
key: "3000~test-ns~apps~resource~test-resource-with-dashes~updated",
|
||||
expected: EventKey{
|
||||
ResourceVersion: 3000,
|
||||
Namespace: "test-ns",
|
||||
Group: "apps",
|
||||
Resource: "resource",
|
||||
Name: "test-resource-with-dashes",
|
||||
Action: "updated",
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -120,12 +126,12 @@ func TestEventKey_Validate(t *testing.T) {
|
|||
},
|
||||
{
|
||||
name: "invalid key - too many parts",
|
||||
key: "1000~default~apps~resource~test~extra",
|
||||
key: "1000~default~apps~resource~test~extra~parts",
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
name: "invalid resource version",
|
||||
key: "invalid~default~apps~resource~test",
|
||||
key: "invalid~default~apps~resource~test~cerated",
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
|
@ -157,6 +163,7 @@ func TestEventStore_ParseEventKey(t *testing.T) {
|
|||
Group: "apps",
|
||||
Resource: "resource",
|
||||
Name: "test-resource",
|
||||
Action: "created",
|
||||
}
|
||||
|
||||
// Convert to string and back
|
||||
|
@ -193,6 +200,7 @@ func TestEventStore_Save_Get(t *testing.T) {
|
|||
Resource: event.Resource,
|
||||
Name: event.Name,
|
||||
ResourceVersion: event.ResourceVersion,
|
||||
Action: event.Action,
|
||||
}
|
||||
|
||||
retrievedEvent, err := store.Get(ctx, eventKey)
|
||||
|
@ -210,6 +218,7 @@ func TestEventStore_Get_NotFound(t *testing.T) {
|
|||
Resource: "resource",
|
||||
Name: "non-existent",
|
||||
ResourceVersion: 9999,
|
||||
Action: "created",
|
||||
}
|
||||
|
||||
_, err := store.Get(ctx, nonExistentKey)
|
||||
|
@ -269,11 +278,69 @@ func TestEventStore_LastEventKey(t *testing.T) {
|
|||
Resource: "resource",
|
||||
Name: "test-2",
|
||||
ResourceVersion: 3000,
|
||||
Action: "created",
|
||||
}
|
||||
|
||||
assert.Equal(t, expectedKey, lastKey)
|
||||
}
|
||||
|
||||
func TestEventStore_ListKeysSince(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store := setupTestEventStore(t)
|
||||
|
||||
// Add events with different resource versions
|
||||
events := []Event{
|
||||
{
|
||||
Namespace: "default",
|
||||
Group: "apps",
|
||||
Resource: "resource",
|
||||
Name: "test-1",
|
||||
ResourceVersion: 1000,
|
||||
Action: DataActionCreated,
|
||||
},
|
||||
{
|
||||
Namespace: "default",
|
||||
Group: "apps",
|
||||
Resource: "resource",
|
||||
Name: "test-2",
|
||||
ResourceVersion: 2000,
|
||||
Action: DataActionUpdated,
|
||||
},
|
||||
{
|
||||
Namespace: "default",
|
||||
Group: "apps",
|
||||
Resource: "resource",
|
||||
Name: "test-3",
|
||||
ResourceVersion: 3000,
|
||||
Action: DataActionDeleted,
|
||||
},
|
||||
}
|
||||
|
||||
// Save all events
|
||||
for _, event := range events {
|
||||
err := store.Save(ctx, event)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// List events since RV 1500 (should get events with RV 2000 and 3000)
|
||||
retrievedEvents := make([]string, 0, 2)
|
||||
for eventKey, err := range store.ListKeysSince(ctx, 1500) {
|
||||
require.NoError(t, err)
|
||||
retrievedEvents = append(retrievedEvents, eventKey)
|
||||
}
|
||||
|
||||
// Should return events in ascending order of resource version
|
||||
require.Len(t, retrievedEvents, 2)
|
||||
evt1, err := ParseEventKey(retrievedEvents[0])
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(2000), evt1.ResourceVersion)
|
||||
assert.Equal(t, "test-2", evt1.Name)
|
||||
evt2, err := ParseEventKey(retrievedEvents[1])
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(3000), evt2.ResourceVersion)
|
||||
assert.Equal(t, "test-3", evt2.Name)
|
||||
}
|
||||
|
||||
func TestEventStore_ListSince(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store := setupTestEventStore(t)
|
||||
|
@ -322,7 +389,11 @@ func TestEventStore_ListSince(t *testing.T) {
|
|||
// Should return events in descending order of resource version
|
||||
require.Len(t, retrievedEvents, 2)
|
||||
assert.Equal(t, int64(2000), retrievedEvents[0].ResourceVersion)
|
||||
assert.Equal(t, "test-2", retrievedEvents[0].Name)
|
||||
assert.Equal(t, DataActionUpdated, retrievedEvents[0].Action)
|
||||
assert.Equal(t, int64(3000), retrievedEvents[1].ResourceVersion)
|
||||
assert.Equal(t, "test-3", retrievedEvents[1].Name)
|
||||
assert.Equal(t, DataActionDeleted, retrievedEvents[1].Action)
|
||||
}
|
||||
|
||||
func TestEventStore_ListSince_Empty(t *testing.T) {
|
||||
|
@ -370,6 +441,7 @@ func TestEventKey_Struct(t *testing.T) {
|
|||
Resource: "resource",
|
||||
Name: "test-resource",
|
||||
ResourceVersion: 1234567890,
|
||||
Action: "created",
|
||||
}
|
||||
|
||||
assert.Equal(t, "test-namespace", key.Namespace)
|
||||
|
|
|
@ -452,8 +452,187 @@ func applyPagination(keys []DataKey, lastSeenRV int64, sortAscending bool) []Dat
|
|||
}
|
||||
|
||||
func (k *kvStorageBackend) ListModifiedSince(ctx context.Context, key NamespacedResource, sinceRv int64) (int64, iter.Seq2[*ModifiedResource, error]) {
|
||||
if !key.Valid() {
|
||||
return 0, func(yield func(*ModifiedResource, error) bool) {
|
||||
yield(nil, errors.New("not implemented"))
|
||||
yield(nil, fmt.Errorf("group, resource, and namespace are required"))
|
||||
}
|
||||
}
|
||||
|
||||
if sinceRv <= 0 {
|
||||
return 0, func(yield func(*ModifiedResource, error) bool) {
|
||||
yield(nil, fmt.Errorf("sinceRv must be greater than 0"))
|
||||
}
|
||||
}
|
||||
|
||||
// Generate a new resource version for the list
|
||||
listRV := k.snowflake.Generate().Int64()
|
||||
|
||||
// Check if sinceRv is older than 1 hour
|
||||
sinceRvTimestamp := snowflake.ID(sinceRv).Time()
|
||||
sinceTime := time.Unix(0, sinceRvTimestamp*int64(time.Millisecond))
|
||||
sinceRvAge := time.Since(sinceTime)
|
||||
|
||||
if sinceRvAge > time.Hour {
|
||||
k.log.Debug("ListModifiedSince using data store", "sinceRv", sinceRv, "sinceRvAge", sinceRvAge)
|
||||
return listRV, k.listModifiedSinceDataStore(ctx, key, sinceRv)
|
||||
}
|
||||
|
||||
k.log.Debug("ListModifiedSince using event store", "sinceRv", sinceRv, "sinceRvAge", sinceRvAge)
|
||||
return listRV, k.listModifiedSinceEventStore(ctx, key, sinceRv)
|
||||
}
|
||||
|
||||
func convertEventType(action DataAction) resourcepb.WatchEvent_Type {
|
||||
switch action {
|
||||
case DataActionCreated:
|
||||
return resourcepb.WatchEvent_ADDED
|
||||
case DataActionUpdated:
|
||||
return resourcepb.WatchEvent_MODIFIED
|
||||
case DataActionDeleted:
|
||||
return resourcepb.WatchEvent_DELETED
|
||||
default:
|
||||
panic(fmt.Sprintf("unknown DataAction: %v", action))
|
||||
}
|
||||
}
|
||||
|
||||
func (k *kvStorageBackend) getValueFromDataStore(ctx context.Context, dataKey DataKey) ([]byte, error) {
|
||||
raw, err := k.dataStore.Get(ctx, dataKey)
|
||||
if err != nil {
|
||||
return []byte{}, err
|
||||
}
|
||||
|
||||
value, err := io.ReadAll(raw)
|
||||
if err != nil {
|
||||
return []byte{}, err
|
||||
}
|
||||
|
||||
return value, nil
|
||||
}
|
||||
|
||||
func (k *kvStorageBackend) listModifiedSinceDataStore(ctx context.Context, key NamespacedResource, sinceRv int64) iter.Seq2[*ModifiedResource, error] {
|
||||
return func(yield func(*ModifiedResource, error) bool) {
|
||||
var lastSeenResource *ModifiedResource
|
||||
var lastSeenDataKey DataKey
|
||||
for dataKey, err := range k.dataStore.Keys(ctx, ListRequestKey{Namespace: key.Namespace, Group: key.Group, Resource: key.Resource}) {
|
||||
if err != nil {
|
||||
yield(&ModifiedResource{}, err)
|
||||
return
|
||||
}
|
||||
|
||||
if dataKey.ResourceVersion < sinceRv {
|
||||
continue
|
||||
}
|
||||
|
||||
if lastSeenResource == nil {
|
||||
lastSeenResource = &ModifiedResource{
|
||||
Key: resourcepb.ResourceKey{
|
||||
Namespace: dataKey.Namespace,
|
||||
Group: dataKey.Group,
|
||||
Resource: dataKey.Resource,
|
||||
Name: dataKey.Name,
|
||||
},
|
||||
ResourceVersion: dataKey.ResourceVersion,
|
||||
Action: convertEventType(dataKey.Action),
|
||||
}
|
||||
lastSeenDataKey = dataKey
|
||||
}
|
||||
|
||||
if lastSeenResource.Key.Name != dataKey.Name {
|
||||
value, err := k.getValueFromDataStore(ctx, lastSeenDataKey)
|
||||
if err != nil {
|
||||
yield(&ModifiedResource{}, err)
|
||||
return
|
||||
}
|
||||
|
||||
lastSeenResource.Value = value
|
||||
|
||||
if !yield(lastSeenResource, nil) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
lastSeenResource = &ModifiedResource{
|
||||
Key: resourcepb.ResourceKey{
|
||||
Namespace: dataKey.Namespace,
|
||||
Group: dataKey.Group,
|
||||
Resource: dataKey.Resource,
|
||||
Name: dataKey.Name,
|
||||
},
|
||||
ResourceVersion: dataKey.ResourceVersion,
|
||||
Action: convertEventType(dataKey.Action),
|
||||
}
|
||||
lastSeenDataKey = dataKey
|
||||
}
|
||||
|
||||
if lastSeenResource != nil {
|
||||
value, err := k.getValueFromDataStore(ctx, lastSeenDataKey)
|
||||
if err != nil {
|
||||
yield(&ModifiedResource{}, err)
|
||||
return
|
||||
}
|
||||
|
||||
lastSeenResource.Value = value
|
||||
|
||||
yield(lastSeenResource, nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (k *kvStorageBackend) listModifiedSinceEventStore(ctx context.Context, key NamespacedResource, sinceRv int64) iter.Seq2[*ModifiedResource, error] {
|
||||
return func(yield func(*ModifiedResource, error) bool) {
|
||||
// store all events ordered by RV for the given tenant here
|
||||
eventKeys := make([]EventKey, 0)
|
||||
for evtKeyStr, err := range k.eventStore.ListKeysSince(ctx, sinceRv-defaultLookbackPeriod.Nanoseconds()) {
|
||||
if err != nil {
|
||||
yield(&ModifiedResource{}, err)
|
||||
return
|
||||
}
|
||||
|
||||
evtKey, err := ParseEventKey(evtKeyStr)
|
||||
if err != nil {
|
||||
yield(&ModifiedResource{}, err)
|
||||
return
|
||||
}
|
||||
|
||||
if evtKey.ResourceVersion < sinceRv {
|
||||
continue
|
||||
}
|
||||
|
||||
if evtKey.Group != key.Group || evtKey.Resource != key.Resource || evtKey.Namespace != key.Namespace {
|
||||
continue
|
||||
}
|
||||
|
||||
eventKeys = append(eventKeys, evtKey)
|
||||
}
|
||||
|
||||
// we only care about the latest revision of every resource in the list
|
||||
seen := make(map[string]struct{})
|
||||
for i := len(eventKeys) - 1; i >= 0; i -= 1 {
|
||||
evtKey := eventKeys[i]
|
||||
if _, ok := seen[evtKey.Name]; ok {
|
||||
continue
|
||||
}
|
||||
seen[evtKey.Name] = struct{}{}
|
||||
|
||||
value, err := k.getValueFromDataStore(ctx, DataKey(evtKey))
|
||||
if err != nil {
|
||||
yield(&ModifiedResource{}, err)
|
||||
return
|
||||
}
|
||||
|
||||
if !yield(&ModifiedResource{
|
||||
Key: resourcepb.ResourceKey{
|
||||
Group: evtKey.Group,
|
||||
Resource: evtKey.Resource,
|
||||
Namespace: evtKey.Namespace,
|
||||
Name: evtKey.Name,
|
||||
},
|
||||
Action: convertEventType(evtKey.Action),
|
||||
ResourceVersion: evtKey.ResourceVersion,
|
||||
Value: value,
|
||||
}, nil) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -4,9 +4,13 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand/v2"
|
||||
"slices"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/bwmarrin/snowflake"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
|
@ -15,6 +19,12 @@ import (
|
|||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
)
|
||||
|
||||
var appsNamespace = NamespacedResource{
|
||||
Namespace: "default",
|
||||
Group: "apps",
|
||||
Resource: "resource",
|
||||
}
|
||||
|
||||
func setupTestStorageBackend(t *testing.T) *kvStorageBackend {
|
||||
kv := setupTestKV(t)
|
||||
return NewKvStorageBackend(kv)
|
||||
|
@ -135,6 +145,7 @@ func TestKvStorageBackend_WriteEvent_Success(t *testing.T) {
|
|||
Resource: "resources",
|
||||
Name: "test-resource",
|
||||
ResourceVersion: rv,
|
||||
Action: expectedAction,
|
||||
}
|
||||
|
||||
_, err = backend.eventStore.Get(ctx, eventKey)
|
||||
|
@ -341,7 +352,12 @@ func TestKvStorageBackend_ListIterator_Success(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, res := range resources {
|
||||
testObj, err := createTestObjectWithName(res.name, res.group, res.value)
|
||||
ns := NamespacedResource{
|
||||
Group: res.group,
|
||||
Resource: "resource",
|
||||
Namespace: "default",
|
||||
}
|
||||
testObj, err := createTestObjectWithName(res.name, ns, res.value)
|
||||
require.NoError(t, err)
|
||||
|
||||
metaAccessor, err := utils.MetaAccessor(testObj)
|
||||
|
@ -424,7 +440,7 @@ func TestKvStorageBackend_ListIterator_WithPagination(t *testing.T) {
|
|||
|
||||
// Create multiple test resources
|
||||
for i := 1; i <= 5; i++ {
|
||||
testObj, err := createTestObjectWithName(fmt.Sprintf("resource-%d", i), "apps", fmt.Sprintf("data-%d", i))
|
||||
testObj, err := createTestObjectWithName(fmt.Sprintf("resource-%d", i), appsNamespace, fmt.Sprintf("data-%d", i))
|
||||
require.NoError(t, err)
|
||||
|
||||
metaAccessor, err := utils.MetaAccessor(testObj)
|
||||
|
@ -603,7 +619,7 @@ func TestKvStorageBackend_ListIterator_SpecificResourceVersion(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
|
||||
// Create a resource
|
||||
testObj, err := createTestObjectWithName("test-resource", "apps", "initial-data")
|
||||
testObj, err := createTestObjectWithName("test-resource", appsNamespace, "initial-data")
|
||||
require.NoError(t, err)
|
||||
|
||||
metaAccessor, err := utils.MetaAccessor(testObj)
|
||||
|
@ -663,17 +679,248 @@ func TestKvStorageBackend_ListIterator_SpecificResourceVersion(t *testing.T) {
|
|||
require.Len(t, collectedItems, 1)
|
||||
|
||||
// Verify we got the original data, not the updated data
|
||||
originalObj, err := createTestObjectWithName("test-resource", "apps", "initial-data")
|
||||
originalObj, err := createTestObjectWithName("test-resource", appsNamespace, "initial-data")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, objectToJSONBytes(t, originalObj), collectedItems[0])
|
||||
}
|
||||
|
||||
func TestKvStorageBackend_ListModifiedSince(t *testing.T) {
|
||||
backend := setupTestStorageBackend(t)
|
||||
ctx := context.Background()
|
||||
|
||||
ns := NamespacedResource{
|
||||
Namespace: "default",
|
||||
Group: "apps",
|
||||
Resource: "resources",
|
||||
}
|
||||
|
||||
expectations := seedBackend(t, backend, ctx, ns)
|
||||
for _, expectation := range expectations {
|
||||
_, seq := backend.ListModifiedSince(ctx, ns, expectation.rv)
|
||||
|
||||
for mr, err := range seq {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, mr.Key.Group, ns.Group)
|
||||
require.Equal(t, mr.Key.Namespace, ns.Namespace)
|
||||
require.Equal(t, mr.Key.Resource, ns.Resource)
|
||||
|
||||
expectedMr, ok := expectation.changes[mr.Key.Name]
|
||||
require.True(t, ok, "ListModifiedSince yielded unexpected resource: ", mr.Key.String())
|
||||
require.Equal(t, mr.ResourceVersion, expectedMr.ResourceVersion)
|
||||
require.Equal(t, mr.Action, expectedMr.Action)
|
||||
require.Equal(t, string(mr.Value), string(expectedMr.Value))
|
||||
delete(expectation.changes, mr.Key.Name)
|
||||
}
|
||||
|
||||
require.Equal(t, 0, len(expectation.changes), "ListModifiedSince failed to return one or more expected items")
|
||||
}
|
||||
}
|
||||
|
||||
type expectation struct {
|
||||
rv int64
|
||||
changes map[string]*ModifiedResource
|
||||
}
|
||||
|
||||
func randomString() string {
|
||||
const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
|
||||
result := make([]byte, 16)
|
||||
for i := range result {
|
||||
result[i] = charset[rand.IntN(len(charset))]
|
||||
}
|
||||
return string(result)
|
||||
}
|
||||
|
||||
func randomStringGenerator() func() string {
|
||||
generated := make([]string, 0)
|
||||
return func() string {
|
||||
var str string
|
||||
for str == "" {
|
||||
randString := randomString()
|
||||
if !slices.Contains(generated, randString) {
|
||||
str = randString
|
||||
}
|
||||
}
|
||||
return str
|
||||
}
|
||||
}
|
||||
|
||||
// creates 2 hour old snowflake for testing
|
||||
func generateOldSnowflake(t *testing.T) int64 {
|
||||
// Generate a current snowflake first
|
||||
node, err := snowflake.NewNode(1)
|
||||
require.NoError(t, err)
|
||||
currentSnowflake := node.Generate().Int64()
|
||||
|
||||
// Extract its timestamp component by shifting right
|
||||
currentTimestamp := currentSnowflake >> 22
|
||||
|
||||
// Subtract 2 hours (in milliseconds) from the timestamp
|
||||
twoHoursMs := int64(2 * time.Hour / time.Millisecond)
|
||||
oldTimestamp := currentTimestamp - twoHoursMs
|
||||
|
||||
// Reconstruct snowflake: [timestamp:41][node:10][sequence:12]
|
||||
// Keep the original node and sequence bits
|
||||
nodeAndSequence := currentSnowflake & 0x3FFFFF // Bottom 22 bits (10 node + 12 sequence)
|
||||
snowflakeID := (oldTimestamp << 22) | nodeAndSequence
|
||||
|
||||
return snowflakeID
|
||||
}
|
||||
|
||||
// seedBackend seeds the kvstore with data and return the expected result for ListModifiedSince calls
|
||||
func seedBackend(t *testing.T, backend *kvStorageBackend, ctx context.Context, ns NamespacedResource) []expectation {
|
||||
uniqueStringGen := randomStringGenerator()
|
||||
nsDifferentNamespace := NamespacedResource{
|
||||
Namespace: "uaoeueao",
|
||||
Group: ns.Group,
|
||||
Resource: ns.Resource,
|
||||
}
|
||||
|
||||
expectations := make([]expectation, 0)
|
||||
// initial test will contain the same "changes" as the second one (first one added by the for loop below)
|
||||
// this is done with a 2 hour old RV so it uses the event store instead of the data store to check for changes
|
||||
expectations = append(expectations, expectation{
|
||||
rv: generateOldSnowflake(t),
|
||||
changes: make(map[string]*ModifiedResource),
|
||||
})
|
||||
|
||||
for range 100 {
|
||||
updates := rand.IntN(5)
|
||||
shouldDelete := rand.IntN(100) < 10
|
||||
mr := createAndSaveTestObject(t, backend, ctx, ns, uniqueStringGen, updates, shouldDelete)
|
||||
expectations = append(expectations, expectation{
|
||||
rv: mr.ResourceVersion,
|
||||
changes: make(map[string]*ModifiedResource),
|
||||
})
|
||||
|
||||
for _, expect := range expectations {
|
||||
expect.changes[mr.Key.Name] = mr
|
||||
}
|
||||
|
||||
// also seed data to some random namespace to make sure we won't return this data
|
||||
updates = rand.IntN(5)
|
||||
shouldDelete = rand.IntN(100) < 10
|
||||
_ = createAndSaveTestObject(t, backend, ctx, nsDifferentNamespace, uniqueStringGen, updates, shouldDelete)
|
||||
}
|
||||
|
||||
// last test will simulate calling ListModifiedSince with a newer RV than all the updates above
|
||||
rv, _ := backend.ListModifiedSince(ctx, ns, 1)
|
||||
expectations = append(expectations, expectation{
|
||||
rv: rv,
|
||||
changes: make(map[string]*ModifiedResource), // empty
|
||||
})
|
||||
|
||||
return expectations
|
||||
}
|
||||
|
||||
func createAndSaveTestObject(t *testing.T, backend *kvStorageBackend, ctx context.Context, ns NamespacedResource, uniqueStringGen func() string, updates int, deleted bool) *ModifiedResource {
|
||||
name := uniqueStringGen()
|
||||
action := resourcepb.WatchEvent_ADDED
|
||||
rv, testObj := addTestObject(t, backend, ctx, ns, name, uniqueStringGen())
|
||||
|
||||
for i := 0; i < updates; i += 1 {
|
||||
rv = updateTestObject(t, backend, ctx, testObj, rv, ns, name, uniqueStringGen())
|
||||
action = resourcepb.WatchEvent_MODIFIED
|
||||
}
|
||||
|
||||
if deleted {
|
||||
rv = deleteTestObject(t, backend, ctx, testObj, rv, ns, name)
|
||||
action = resourcepb.WatchEvent_DELETED
|
||||
}
|
||||
|
||||
value, err := testObj.MarshalJSON()
|
||||
require.NoError(t, err)
|
||||
|
||||
return &ModifiedResource{
|
||||
Key: resourcepb.ResourceKey{
|
||||
Namespace: ns.Namespace,
|
||||
Group: ns.Group,
|
||||
Resource: ns.Resource,
|
||||
Name: name,
|
||||
},
|
||||
ResourceVersion: rv,
|
||||
Action: action,
|
||||
Value: value,
|
||||
}
|
||||
}
|
||||
|
||||
func addTestObject(t *testing.T, backend *kvStorageBackend, ctx context.Context, ns NamespacedResource, name, value string) (int64, *unstructured.Unstructured) {
|
||||
testObj, err := createTestObjectWithName(name, ns, value)
|
||||
require.NoError(t, err)
|
||||
|
||||
metaAccessor, err := utils.MetaAccessor(testObj)
|
||||
require.NoError(t, err)
|
||||
|
||||
writeEvent := WriteEvent{
|
||||
Type: resourcepb.WatchEvent_ADDED,
|
||||
Key: &resourcepb.ResourceKey{
|
||||
Namespace: ns.Namespace,
|
||||
Group: ns.Group,
|
||||
Resource: ns.Resource,
|
||||
Name: name,
|
||||
},
|
||||
Value: objectToJSONBytes(t, testObj),
|
||||
Object: metaAccessor,
|
||||
PreviousRV: 0,
|
||||
}
|
||||
|
||||
rv, err := backend.WriteEvent(ctx, writeEvent)
|
||||
require.NoError(t, err)
|
||||
return rv, testObj
|
||||
}
|
||||
|
||||
func deleteTestObject(t *testing.T, backend *kvStorageBackend, ctx context.Context, originalObj *unstructured.Unstructured, previousRV int64, ns NamespacedResource, name string) int64 {
|
||||
metaAccessor, err := utils.MetaAccessor(originalObj)
|
||||
require.NoError(t, err)
|
||||
|
||||
writeEvent := WriteEvent{
|
||||
Type: resourcepb.WatchEvent_DELETED,
|
||||
Key: &resourcepb.ResourceKey{
|
||||
Namespace: ns.Namespace,
|
||||
Group: ns.Group,
|
||||
Resource: ns.Resource,
|
||||
Name: name,
|
||||
},
|
||||
Value: objectToJSONBytes(t, originalObj),
|
||||
Object: metaAccessor,
|
||||
ObjectOld: metaAccessor,
|
||||
PreviousRV: previousRV,
|
||||
}
|
||||
|
||||
rv, err := backend.WriteEvent(ctx, writeEvent)
|
||||
require.NoError(t, err)
|
||||
return rv
|
||||
}
|
||||
|
||||
func updateTestObject(t *testing.T, backend *kvStorageBackend, ctx context.Context, originalObj *unstructured.Unstructured, previousRV int64, ns NamespacedResource, name, value string) int64 {
|
||||
originalObj.Object["spec"].(map[string]any)["value"] = value
|
||||
|
||||
metaAccessor, err := utils.MetaAccessor(originalObj)
|
||||
require.NoError(t, err)
|
||||
|
||||
writeEvent := WriteEvent{
|
||||
Type: resourcepb.WatchEvent_MODIFIED,
|
||||
Key: &resourcepb.ResourceKey{
|
||||
Namespace: ns.Namespace,
|
||||
Group: ns.Group,
|
||||
Resource: ns.Resource,
|
||||
Name: name,
|
||||
},
|
||||
Value: objectToJSONBytes(t, originalObj),
|
||||
Object: metaAccessor,
|
||||
PreviousRV: previousRV,
|
||||
}
|
||||
|
||||
rv, err := backend.WriteEvent(ctx, writeEvent)
|
||||
require.NoError(t, err)
|
||||
return rv
|
||||
}
|
||||
|
||||
func TestKvStorageBackend_ListHistory_Success(t *testing.T) {
|
||||
backend := setupTestStorageBackend(t)
|
||||
ctx := context.Background()
|
||||
|
||||
// Create initial resource
|
||||
testObj, err := createTestObjectWithName("test-resource", "apps", "initial-data")
|
||||
testObj, err := createTestObjectWithName("test-resource", appsNamespace, "initial-data")
|
||||
require.NoError(t, err)
|
||||
|
||||
metaAccessor, err := utils.MetaAccessor(testObj)
|
||||
|
@ -757,15 +1004,15 @@ func TestKvStorageBackend_ListHistory_Success(t *testing.T) {
|
|||
require.Equal(t, rv1, historyItems[2].resourceVersion)
|
||||
|
||||
// Verify the content matches expectations for all versions
|
||||
finalObj, err := createTestObjectWithName("test-resource", "apps", "final-data")
|
||||
finalObj, err := createTestObjectWithName("test-resource", appsNamespace, "final-data")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, objectToJSONBytes(t, finalObj), historyItems[0].value)
|
||||
|
||||
updatedObj, err := createTestObjectWithName("test-resource", "apps", "updated-data")
|
||||
updatedObj, err := createTestObjectWithName("test-resource", appsNamespace, "updated-data")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, objectToJSONBytes(t, updatedObj), historyItems[1].value)
|
||||
|
||||
initialObj, err := createTestObjectWithName("test-resource", "apps", "initial-data")
|
||||
initialObj, err := createTestObjectWithName("test-resource", appsNamespace, "initial-data")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, objectToJSONBytes(t, initialObj), historyItems[2].value)
|
||||
}
|
||||
|
@ -775,7 +1022,7 @@ func TestKvStorageBackend_ListTrash_Success(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
|
||||
// Create a resource
|
||||
testObj, err := createTestObjectWithName("test-resource", "apps", "test-data")
|
||||
testObj, err := createTestObjectWithName("test-resource", appsNamespace, "test-data")
|
||||
require.NoError(t, err)
|
||||
|
||||
metaAccessor, err := utils.MetaAccessor(testObj)
|
||||
|
@ -873,7 +1120,12 @@ func TestKvStorageBackend_GetResourceStats_Success(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, res := range resources {
|
||||
testObj, err := createTestObjectWithName(res.name, res.group, "test-data")
|
||||
ns := NamespacedResource{
|
||||
Group: res.group,
|
||||
Namespace: "default",
|
||||
Resource: "resource",
|
||||
}
|
||||
testObj, err := createTestObjectWithName(res.name, ns, "test-data")
|
||||
require.NoError(t, err)
|
||||
|
||||
metaAccessor, err := utils.MetaAccessor(testObj)
|
||||
|
@ -930,7 +1182,7 @@ func TestKvStorageBackend_GetResourceStats_Success(t *testing.T) {
|
|||
|
||||
// createTestObject creates a test unstructured object with standard values
|
||||
func createTestObject() (*unstructured.Unstructured, error) {
|
||||
return createTestObjectWithName("test-resource", "apps", "test data")
|
||||
return createTestObjectWithName("test-resource", appsNamespace, "test data")
|
||||
}
|
||||
|
||||
// objectToJSONBytes converts an unstructured object to JSON bytes
|
||||
|
@ -941,14 +1193,14 @@ func objectToJSONBytes(t *testing.T, obj *unstructured.Unstructured) []byte {
|
|||
}
|
||||
|
||||
// createTestObjectWithName creates a test unstructured object with specific name, group and value
|
||||
func createTestObjectWithName(name, group, value string) (*unstructured.Unstructured, error) {
|
||||
func createTestObjectWithName(name string, ns NamespacedResource, value string) (*unstructured.Unstructured, error) {
|
||||
u := &unstructured.Unstructured{
|
||||
Object: map[string]any{
|
||||
"apiVersion": group + "/v1",
|
||||
"kind": "resource",
|
||||
"apiVersion": ns.Group + "/v1",
|
||||
"kind": ns.Resource,
|
||||
"metadata": map[string]any{
|
||||
"name": name,
|
||||
"namespace": "default",
|
||||
"namespace": ns.Namespace,
|
||||
},
|
||||
"spec": map[string]any{
|
||||
"value": value,
|
||||
|
|
Loading…
Reference in New Issue