mirror of https://github.com/grafana/grafana.git
Unified Storage: Add sort order to keys func in datastore (#110714)
* Add sort order to keys func in datastore. Add test to not prune deleted events. * include sort field in the ListRequestKey instead of it being a separate param
This commit is contained in:
parent
b6567e5abc
commit
310893292f
|
|
@ -106,6 +106,7 @@ type ListRequestKey struct {
|
|||
Group string
|
||||
Resource string
|
||||
Name string
|
||||
Sort SortOrder
|
||||
}
|
||||
|
||||
func (k ListRequestKey) Validate() error {
|
||||
|
|
@ -194,6 +195,7 @@ func (d *dataStore) Keys(ctx context.Context, key ListRequestKey) iter.Seq2[Data
|
|||
for k, err := range d.kv.Keys(ctx, dataSection, ListOptions{
|
||||
StartKey: prefix,
|
||||
EndKey: PrefixRangeEnd(prefix),
|
||||
Sort: key.Sort,
|
||||
}) {
|
||||
if err != nil {
|
||||
yield(DataKey{}, err)
|
||||
|
|
|
|||
|
|
@ -85,25 +85,33 @@ func (k *kvStorageBackend) pruneEvents(ctx context.Context, key PruningKey) erro
|
|||
return fmt.Errorf("invalid pruning key, all fields must be set: %+v", key)
|
||||
}
|
||||
|
||||
keepEvents := make([]DataKey, 0, prunerMaxEvents)
|
||||
|
||||
listKey := ListRequestKey{
|
||||
Namespace: key.Namespace,
|
||||
Group: key.Group,
|
||||
Resource: key.Resource,
|
||||
Name: key.Name,
|
||||
Sort: SortOrderDesc,
|
||||
}
|
||||
counter := 0
|
||||
// iterate over all keys for the resource and delete versions beyond the latest 20
|
||||
for datakey, err := range k.dataStore.Keys(ctx, ListRequestKey(key)) {
|
||||
for datakey, err := range k.dataStore.Keys(ctx, listKey) {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(keepEvents) < prunerMaxEvents {
|
||||
keepEvents = append(keepEvents, datakey)
|
||||
// Pruner needs to exclude deleted events
|
||||
if counter < prunerMaxEvents && datakey.Action != DataActionDeleted {
|
||||
counter++
|
||||
continue
|
||||
}
|
||||
|
||||
// If we already have 20 versions, delete the oldest one and append the new one
|
||||
err := k.dataStore.Delete(ctx, keepEvents[0])
|
||||
if err != nil {
|
||||
return err
|
||||
// If we already have 20 versions, delete any more create or update events
|
||||
if datakey.Action != DataActionDeleted {
|
||||
err := k.dataStore.Delete(ctx, datakey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
keepEvents = append(keepEvents[1:], datakey)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -1258,6 +1258,7 @@ func TestKvStorageBackend_PruneEvents(t *testing.T) {
|
|||
Group: "apps",
|
||||
Resource: "resources",
|
||||
Name: "test-resource",
|
||||
Sort: SortOrderDesc,
|
||||
}) {
|
||||
require.NoError(t, err)
|
||||
require.NotEqual(t, rv1, datakey.ResourceVersion)
|
||||
|
|
@ -1324,12 +1325,81 @@ func TestKvStorageBackend_PruneEvents(t *testing.T) {
|
|||
Group: "apps",
|
||||
Resource: "resources",
|
||||
Name: "test-resource",
|
||||
Sort: SortOrderDesc,
|
||||
}) {
|
||||
require.NoError(t, err)
|
||||
counter++
|
||||
}
|
||||
require.Equal(t, prunerMaxEvents, counter)
|
||||
})
|
||||
|
||||
t.Run("will not prune deleted events", func(t *testing.T) {
|
||||
backend := setupTestStorageBackend(t)
|
||||
ctx := context.Background()
|
||||
|
||||
// Create a resource
|
||||
ns := NamespacedResource{
|
||||
Namespace: "default",
|
||||
Group: "apps",
|
||||
Resource: "resources",
|
||||
}
|
||||
testObj, err := createTestObjectWithName("test-resource", ns, "test-data")
|
||||
require.NoError(t, err)
|
||||
metaAccessor, err := utils.MetaAccessor(testObj)
|
||||
require.NoError(t, err)
|
||||
writeEvent := WriteEvent{
|
||||
Type: resourcepb.WatchEvent_DELETED,
|
||||
Key: &resourcepb.ResourceKey{
|
||||
Namespace: "default",
|
||||
Group: "apps",
|
||||
Resource: "resources",
|
||||
Name: "test-resource",
|
||||
},
|
||||
Value: objectToJSONBytes(t, testObj),
|
||||
Object: metaAccessor,
|
||||
ObjectOld: metaAccessor,
|
||||
PreviousRV: 0,
|
||||
}
|
||||
rv1, err := backend.WriteEvent(ctx, writeEvent)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Add prunerMaxEvents+1 deleted events
|
||||
// Multiple deleted events for a resource shouldn't happen - this is just to ensure the pruner won't remove deleted events
|
||||
previousRV := rv1
|
||||
for i := 0; i < prunerMaxEvents; i++ {
|
||||
testObj.Object["spec"].(map[string]any)["value"] = fmt.Sprintf("delete-%d", i)
|
||||
writeEvent.Type = resourcepb.WatchEvent_DELETED
|
||||
writeEvent.Value = objectToJSONBytes(t, testObj)
|
||||
writeEvent.PreviousRV = previousRV
|
||||
newRv, err := backend.WriteEvent(ctx, writeEvent)
|
||||
require.NoError(t, err)
|
||||
previousRV = newRv
|
||||
}
|
||||
|
||||
pruningKey := PruningKey{
|
||||
Namespace: "default",
|
||||
Group: "apps",
|
||||
Resource: "resources",
|
||||
Name: "test-resource",
|
||||
}
|
||||
|
||||
err = backend.pruneEvents(ctx, pruningKey)
|
||||
require.NoError(t, err)
|
||||
|
||||
// assert all deleted events exist
|
||||
counter := 0
|
||||
for _, err := range backend.dataStore.Keys(ctx, ListRequestKey{
|
||||
Namespace: "default",
|
||||
Group: "apps",
|
||||
Resource: "resources",
|
||||
Name: "test-resource",
|
||||
Sort: SortOrderDesc,
|
||||
}) {
|
||||
require.NoError(t, err)
|
||||
counter++
|
||||
}
|
||||
require.Equal(t, prunerMaxEvents+1, counter)
|
||||
})
|
||||
}
|
||||
|
||||
// createTestObject creates a test unstructured object with standard values
|
||||
|
|
|
|||
Loading…
Reference in New Issue