unistore: split ListIterator and ListHistory in StorageBackend (#105654)

split listIterator from ListHistory
This commit is contained in:
Georges Chaudy 2025-05-23 15:00:18 +02:00 committed by GitHub
parent 392c1a71c9
commit 04d39cbbc6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 50 additions and 45 deletions

View File

@ -186,11 +186,6 @@ func (r *rowsWrapper) ContinueToken() string {
return r.row.token.String()
}
// ContinueTokenWithCurrentRV implements resource.ListIterator.
func (r *rowsWrapper) ContinueTokenWithCurrentRV() string {
return r.row.token.String()
}
// Error implements resource.ListIterator.
func (r *rowsWrapper) Error() error {
return r.err

View File

@ -229,7 +229,12 @@ func (a *dashboardSqlAccess) ReadResource(ctx context.Context, req *resourcepb.R
return rsp
}
// List implements AppendingStore.
// ListHistory implements StorageBackend.
func (a *dashboardSqlAccess) ListHistory(ctx context.Context, req *resourcepb.ListRequest, cb func(resource.ListIterator) error) (int64, error) {
return a.ListIterator(ctx, req, cb)
}
// List implements StorageBackend.
func (a *dashboardSqlAccess) ListIterator(ctx context.Context, req *resourcepb.ListRequest, cb func(resource.ListIterator) error) (int64, error) {
if req.ResourceVersion != 0 {
return 0, apierrors.NewBadRequest("List with explicit resourceVersion is not supported with this storage backend")

View File

@ -235,10 +235,6 @@ func isDeletedValue(raw []byte) bool {
}
func (s *cdkBackend) ListIterator(ctx context.Context, req *resourcepb.ListRequest, cb func(ListIterator) error) (int64, error) {
if req.Source != resourcepb.ListRequest_STORE {
return 0, fmt.Errorf("listing from history not supported in CDK backend")
}
resources, err := buildTree(ctx, s, req.Options.Key)
if err != nil {
return 0, err
@ -247,6 +243,10 @@ func (s *cdkBackend) ListIterator(ctx context.Context, req *resourcepb.ListReque
return resources.listRV, err
}
func (s *cdkBackend) ListHistory(ctx context.Context, req *resourcepb.ListRequest, cb func(ListIterator) error) (int64, error) {
return 0, fmt.Errorf("listing from history not supported in CDK backend")
}
func (s *cdkBackend) WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
@ -336,11 +336,6 @@ func (c *cdkListIterator) ContinueToken() string {
return fmt.Sprintf("index:%d/key:%s", c.index, c.currentKey)
}
// ContinueTokenWithCurrentRV implements ListIterator.
func (c *cdkListIterator) ContinueTokenWithCurrentRV() string {
return fmt.Sprintf("index:%d/key:%s", c.index, c.currentKey)
}
// Name implements ListIterator.
func (c *cdkListIterator) Name() string {
return c.currentKey // TODO (parse name from key)

View File

@ -46,9 +46,6 @@ type ListIterator interface {
// ContinueToken returns the token that can be used to start iterating *after* this item
ContinueToken() string
// ContinueTokenWithCurrentRV returns the token that can be used to start iterating *before* this item
ContinueTokenWithCurrentRV() string
// ResourceVersion of the current item
ResourceVersion() int64
@ -102,6 +99,9 @@ type StorageBackend interface {
// but are the the final answer.
ListIterator(context.Context, *resourcepb.ListRequest, func(ListIterator) error) (int64, error)
// ListHistory is like ListIterator, but it returns the history of a resource
ListHistory(context.Context, *resourcepb.ListRequest, func(ListIterator) error) (int64, error)
// Get all events from the store
// For HA setups, this will be more events than the local WriteEvent above!
WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error)
@ -808,7 +808,7 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
}}, nil
}
rv, err := s.backend.ListIterator(ctx, req, func(iter ListIterator) error {
iterFunc := func(iter ListIterator) error {
for iter.Next() {
if err := iter.Error(); err != nil {
return err
@ -827,13 +827,6 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
rsp.Items = append(rsp.Items, item)
if len(rsp.Items) >= int(req.Limit) || pageBytes >= maxPageBytes {
t := iter.ContinueToken()
if req.Source == resourcepb.ListRequest_HISTORY || req.Source == resourcepb.ListRequest_TRASH {
// For history lists, we need to use the current RV in the continue token
// to ensure consistent pagination. The order depends on VersionMatch:
// - NotOlderThan: ascending order (oldest to newest)
// - Unset: descending order (newest to oldest)
t = iter.ContinueTokenWithCurrentRV()
}
if iter.Next() {
rsp.NextPageToken = t
}
@ -841,7 +834,18 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
}
}
return iter.Error()
})
}
var rv int64
switch req.Source {
case resourcepb.ListRequest_STORE:
rv, err = s.backend.ListIterator(ctx, req, iterFunc)
case resourcepb.ListRequest_HISTORY, resourcepb.ListRequest_TRASH:
rv, err = s.backend.ListHistory(ctx, req, iterFunc)
default:
return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid list source: %v", req.Source))
}
if err != nil {
rsp.Error = AsErrorResult(err)
return rsp, nil

View File

@ -569,10 +569,6 @@ func (b *backend) ListIterator(ctx context.Context, req *resourcepb.ListRequest,
return 0, fmt.Errorf("missing group or resource")
}
if req.Source != resourcepb.ListRequest_STORE {
return b.getHistory(ctx, req, cb)
}
// TODO: think about how to handler VersionMatch. We should be able to use latest for the first page (only).
// TODO: add support for RemainingItemCount
@ -583,6 +579,13 @@ func (b *backend) ListIterator(ctx context.Context, req *resourcepb.ListRequest,
return b.listLatest(ctx, req, cb)
}
func (b *backend) ListHistory(ctx context.Context, req *resourcepb.ListRequest, cb func(resource.ListIterator) error) (int64, error) {
ctx, span := b.tracer.Start(ctx, tracePrefix+"ListHistory")
defer span.End()
return b.getHistory(ctx, req, cb)
}
// listLatest fetches the resources from the resource table.
func (b *backend) listLatest(ctx context.Context, req *resourcepb.ListRequest, cb func(resource.ListIterator) error) (int64, error) {
ctx, span := b.tracer.Start(ctx, tracePrefix+"listLatest")
@ -733,7 +736,9 @@ func (b *backend) getHistory(ctx context.Context, req *resourcepb.ListRequest, c
// for Unset (default) and Exact matching.
listReq.SortAscending = req.GetVersionMatchV2() == resourcepb.ResourceVersionMatchV2_NotOlderThan
iter := &listIter{}
iter := &listIter{
useCurrentRV: true, // use the current RV for the continue token instead of the listRV
}
if req.NextPageToken != "" {
continueToken, err := resource.GetContinueToken(req.NextPageToken)
if err != nil {

View File

@ -8,10 +8,11 @@ import (
var _ resource.ListIterator = (*listIter)(nil)
type listIter struct {
rows db.Rows
offset int64
listRV int64
sortAsc bool
rows db.Rows
offset int64
listRV int64
sortAsc bool
useCurrentRV bool
// any error
err error
@ -29,13 +30,12 @@ type listIter struct {
// ContinueToken implements resource.ListIterator.
func (l *listIter) ContinueToken() string {
if l.useCurrentRV {
return resource.ContinueToken{ResourceVersion: l.rv, StartOffset: l.offset, SortAscending: l.sortAsc}.String()
}
return resource.ContinueToken{ResourceVersion: l.listRV, StartOffset: l.offset, SortAscending: l.sortAsc}.String()
}
func (l *listIter) ContinueTokenWithCurrentRV() string {
return resource.ContinueToken{ResourceVersion: l.rv, StartOffset: l.offset, SortAscending: l.sortAsc}.String()
}
func (l *listIter) Error() error {
return l.err
}

View File

@ -219,7 +219,7 @@ func TestListIter(t *testing.T) {
require.Equal(t, expected, actual)
})
t.Run("ContinueTokenWithCurrentRV uses current row's RV", func(t *testing.T) {
t.Run("ContinueToken uses the current row's RV", func(t *testing.T) {
listReq := sqlResourceListRequest{
SQLTemplate: sqltemplate.New(dialect),
Request: new(resourcepb.ListRequest),
@ -229,14 +229,15 @@ func TestListIter(t *testing.T) {
require.NoError(t, err)
iter := &listIter{
rows: rows,
listRV: 300,
sortAsc: true,
rows: rows,
listRV: 300,
sortAsc: true,
useCurrentRV: true, // use the current RV for the continue token instead of the listRV
}
require.True(t, iter.Next())
token := iter.ContinueTokenWithCurrentRV()
token := iter.ContinueToken()
var actual resource.ContinueToken
b, err := base64.StdEncoding.DecodeString(token)