mirror of https://github.com/grafana/grafana.git
				
				
				
			
		
			
				
	
	
		
			935 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			935 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Go
		
	
	
	
| package sql
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"database/sql"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"math"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/go-sql-driver/mysql"
 | |
| 	"github.com/google/uuid"
 | |
| 	"github.com/jackc/pgx/v5/pgconn"
 | |
| 	"github.com/lib/pq"
 | |
| 	"github.com/mattn/go-sqlite3"
 | |
| 	"github.com/prometheus/client_golang/prometheus"
 | |
| 	"go.opentelemetry.io/otel/trace"
 | |
| 	"go.opentelemetry.io/otel/trace/noop"
 | |
| 	"google.golang.org/protobuf/proto"
 | |
| 	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | |
| 
 | |
| 	"github.com/grafana/grafana-app-sdk/logging"
 | |
| 
 | |
| 	"github.com/grafana/grafana/pkg/storage/unified/resource"
 | |
| 	"github.com/grafana/grafana/pkg/storage/unified/sql/db"
 | |
| 	"github.com/grafana/grafana/pkg/storage/unified/sql/dbutil"
 | |
| 	"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
 | |
| 	"github.com/grafana/grafana/pkg/util/debouncer"
 | |
| )
 | |
| 
 | |
| const tracePrefix = "sql.resource."
 | |
| const defaultPollingInterval = 100 * time.Millisecond
 | |
| const defaultWatchBufferSize = 100 // number of events to buffer in the watch stream
 | |
| const defaultPrunerHistoryLimit = 20
 | |
| 
 | |
| type Backend interface {
 | |
| 	resource.StorageBackend
 | |
| 	resource.DiagnosticsServer
 | |
| 	resource.LifecycleHooks
 | |
| }
 | |
| 
 | |
| type BackendOptions struct {
 | |
| 	DBProvider      db.DBProvider
 | |
| 	Tracer          trace.Tracer
 | |
| 	Reg             prometheus.Registerer
 | |
| 	PollingInterval time.Duration
 | |
| 	WatchBufferSize int
 | |
| 	IsHA            bool
 | |
| 	storageMetrics  *resource.StorageMetrics
 | |
| 
 | |
| 	// If true, the backend will prune history on write events.
 | |
| 	// Will be removed once fully rolled out.
 | |
| 	withPruner bool
 | |
| 
 | |
| 	// testing
 | |
| 	SimulatedNetworkLatency time.Duration // slows down the create transactions by a fixed amount
 | |
| }
 | |
| 
 | |
| func NewBackend(opts BackendOptions) (Backend, error) {
 | |
| 	if opts.DBProvider == nil {
 | |
| 		return nil, errors.New("no db provider")
 | |
| 	}
 | |
| 	if opts.Tracer == nil {
 | |
| 		opts.Tracer = noop.NewTracerProvider().Tracer("sql-backend")
 | |
| 	}
 | |
| 	ctx, cancel := context.WithCancel(context.Background())
 | |
| 
 | |
| 	if opts.PollingInterval == 0 {
 | |
| 		opts.PollingInterval = defaultPollingInterval
 | |
| 	}
 | |
| 	if opts.WatchBufferSize == 0 {
 | |
| 		opts.WatchBufferSize = defaultWatchBufferSize
 | |
| 	}
 | |
| 	return &backend{
 | |
| 		isHA:                    opts.IsHA,
 | |
| 		done:                    ctx.Done(),
 | |
| 		cancel:                  cancel,
 | |
| 		log:                     logging.DefaultLogger.With("logger", "sql-resource-server"),
 | |
| 		tracer:                  opts.Tracer,
 | |
| 		reg:                     opts.Reg,
 | |
| 		dbProvider:              opts.DBProvider,
 | |
| 		pollingInterval:         opts.PollingInterval,
 | |
| 		watchBufferSize:         opts.WatchBufferSize,
 | |
| 		storageMetrics:          opts.storageMetrics,
 | |
| 		bulkLock:                &bulkLock{running: make(map[string]bool)},
 | |
| 		simulatedNetworkLatency: opts.SimulatedNetworkLatency,
 | |
| 		withPruner:              opts.withPruner,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| // pruningKey is a comparable key for pruning history.
 | |
| type pruningKey struct {
 | |
| 	namespace string
 | |
| 	group     string
 | |
| 	resource  string
 | |
| 	name      string
 | |
| }
 | |
| 
 | |
| // Small abstraction to allow for different pruner implementations.
 | |
| // This can be removed once the debouncer is deployed.
 | |
| type pruner interface {
 | |
| 	Add(key pruningKey) error
 | |
| 	Start(ctx context.Context)
 | |
| }
 | |
| 
 | |
| type noopPruner struct{}
 | |
| 
 | |
| func (p *noopPruner) Add(key pruningKey) error {
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (p *noopPruner) Start(ctx context.Context) {}
 | |
| 
 | |
| type backend struct {
 | |
| 	//general
 | |
| 	isHA bool
 | |
| 
 | |
| 	// server lifecycle
 | |
| 	done     <-chan struct{}
 | |
| 	cancel   context.CancelFunc
 | |
| 	initOnce sync.Once
 | |
| 	initErr  error
 | |
| 
 | |
| 	// o11y
 | |
| 	log            logging.Logger
 | |
| 	tracer         trace.Tracer
 | |
| 	reg            prometheus.Registerer
 | |
| 	storageMetrics *resource.StorageMetrics
 | |
| 
 | |
| 	// database
 | |
| 	dbProvider db.DBProvider
 | |
| 	db         db.DB
 | |
| 	dialect    sqltemplate.Dialect
 | |
| 	bulkLock   *bulkLock
 | |
| 
 | |
| 	// watch streaming
 | |
| 	//stream chan *resource.WatchEvent
 | |
| 	pollingInterval time.Duration
 | |
| 	watchBufferSize int
 | |
| 	notifier        eventNotifier
 | |
| 
 | |
| 	// resource version manager
 | |
| 	rvManager *resourceVersionManager
 | |
| 
 | |
| 	// testing
 | |
| 	simulatedNetworkLatency time.Duration
 | |
| 
 | |
| 	historyPruner pruner
 | |
| 	withPruner    bool
 | |
| }
 | |
| 
 | |
| func (b *backend) Init(ctx context.Context) error {
 | |
| 	b.initOnce.Do(func() {
 | |
| 		b.initErr = b.initLocked(ctx)
 | |
| 	})
 | |
| 	return b.initErr
 | |
| }
 | |
| 
 | |
| func (b *backend) initLocked(ctx context.Context) error {
 | |
| 	dbConn, err := b.dbProvider.Init(ctx)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("initialize resource DB: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	if err := dbConn.PingContext(ctx); err != nil {
 | |
| 		return fmt.Errorf("ping resource DB: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	b.db = dbConn
 | |
| 
 | |
| 	driverName := dbConn.DriverName()
 | |
| 	b.dialect = sqltemplate.DialectForDriver(driverName)
 | |
| 	if b.dialect == nil {
 | |
| 		return fmt.Errorf("no dialect for driver %q", driverName)
 | |
| 	}
 | |
| 
 | |
| 	// Initialize ResourceVersionManager
 | |
| 	rvManager, err := NewResourceVersionManager(ResourceManagerOptions{
 | |
| 		Dialect: b.dialect,
 | |
| 		DB:      b.db,
 | |
| 		Tracer:  b.tracer,
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to create resource version manager: %w", err)
 | |
| 	}
 | |
| 	b.rvManager = rvManager
 | |
| 
 | |
| 	// Initialize notifier after dialect is set up
 | |
| 	notifier, err := newNotifier(b)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to create notifier: %w", err)
 | |
| 	}
 | |
| 	b.notifier = notifier
 | |
| 
 | |
| 	if err := b.initPruner(ctx); err != nil {
 | |
| 		return fmt.Errorf("failed to create pruner: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (b *backend) initPruner(ctx context.Context) error {
 | |
| 	if !b.withPruner {
 | |
| 		b.log.Debug("using noop history pruner")
 | |
| 		b.historyPruner = &noopPruner{}
 | |
| 		return nil
 | |
| 	}
 | |
| 	b.log.Debug("using debounced history pruner")
 | |
| 	// Initialize history pruner.
 | |
| 	pruner, err := debouncer.NewGroup(debouncer.DebouncerOpts[pruningKey]{
 | |
| 		Name:       "history_pruner",
 | |
| 		BufferSize: 1000,
 | |
| 		MinWait:    time.Second * 30,
 | |
| 		MaxWait:    time.Minute * 5,
 | |
| 		ProcessHandler: func(ctx context.Context, key pruningKey) error {
 | |
| 			return b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
 | |
| 				res, err := dbutil.Exec(ctx, tx, sqlResourceHistoryPrune, &sqlPruneHistoryRequest{
 | |
| 					SQLTemplate:  sqltemplate.New(b.dialect),
 | |
| 					HistoryLimit: defaultPrunerHistoryLimit,
 | |
| 					Key: &resource.ResourceKey{
 | |
| 						Namespace: key.namespace,
 | |
| 						Group:     key.group,
 | |
| 						Resource:  key.resource,
 | |
| 						Name:      key.name,
 | |
| 					},
 | |
| 				})
 | |
| 				if err != nil {
 | |
| 					return fmt.Errorf("failed to prune history: %w", err)
 | |
| 				}
 | |
| 				rows, err := res.RowsAffected()
 | |
| 				if err != nil {
 | |
| 					return fmt.Errorf("failed to get rows affected: %w", err)
 | |
| 				}
 | |
| 				b.log.Debug("pruned history successfully",
 | |
| 					"namespace", key.namespace,
 | |
| 					"group", key.group,
 | |
| 					"resource", key.resource,
 | |
| 					"name", key.name,
 | |
| 					"rows", rows)
 | |
| 				return nil
 | |
| 			})
 | |
| 		},
 | |
| 		ErrorHandler: func(key pruningKey, err error) {
 | |
| 			b.log.Error("failed to prune history",
 | |
| 				"namespace", key.namespace,
 | |
| 				"group", key.group,
 | |
| 				"resource", key.resource,
 | |
| 				"name", key.name,
 | |
| 				"error", err)
 | |
| 		},
 | |
| 		Reg: b.reg,
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	b.historyPruner = pruner
 | |
| 	b.historyPruner.Start(ctx)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (b *backend) IsHealthy(ctx context.Context, r *resource.HealthCheckRequest) (*resource.HealthCheckResponse, error) {
 | |
| 	// ctxLogger := s.log.FromContext(log.WithContextualAttributes(ctx, []any{"method", "isHealthy"}))
 | |
| 
 | |
| 	if err := b.db.PingContext(ctx); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return &resource.HealthCheckResponse{Status: resource.HealthCheckResponse_SERVING}, nil
 | |
| }
 | |
| 
 | |
| func (b *backend) Stop(_ context.Context) error {
 | |
| 	b.cancel()
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // GetResourceStats implements Backend.
 | |
| func (b *backend) GetResourceStats(ctx context.Context, namespace string, minCount int) ([]resource.ResourceStats, error) {
 | |
| 	ctx, span := b.tracer.Start(ctx, tracePrefix+".GetResourceStats")
 | |
| 	defer span.End()
 | |
| 
 | |
| 	req := &sqlStatsRequest{
 | |
| 		SQLTemplate: sqltemplate.New(b.dialect),
 | |
| 		Namespace:   namespace,
 | |
| 		MinCount:    minCount, // not used in query... yet?
 | |
| 	}
 | |
| 
 | |
| 	res := make([]resource.ResourceStats, 0, 100)
 | |
| 	err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error {
 | |
| 		rows, err := dbutil.QueryRows(ctx, tx, sqlResourceStats, req)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		for rows.Next() {
 | |
| 			row := resource.ResourceStats{}
 | |
| 			err = rows.Scan(&row.Namespace, &row.Group, &row.Resource, &row.Count, &row.ResourceVersion)
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 			if row.Count > int64(minCount) {
 | |
| 				res = append(res, row)
 | |
| 			}
 | |
| 		}
 | |
| 		return err
 | |
| 	})
 | |
| 
 | |
| 	return res, err
 | |
| }
 | |
| 
 | |
| func (b *backend) WriteEvent(ctx context.Context, event resource.WriteEvent) (int64, error) {
 | |
| 	_, span := b.tracer.Start(ctx, tracePrefix+"WriteEvent")
 | |
| 	defer span.End()
 | |
| 	// TODO: validate key ?
 | |
| 	switch event.Type {
 | |
| 	case resource.WatchEvent_ADDED:
 | |
| 		return b.create(ctx, event)
 | |
| 	case resource.WatchEvent_MODIFIED:
 | |
| 		return b.update(ctx, event)
 | |
| 	case resource.WatchEvent_DELETED:
 | |
| 		return b.delete(ctx, event)
 | |
| 	default:
 | |
| 		return 0, fmt.Errorf("unsupported event type")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64, error) {
 | |
| 	ctx, span := b.tracer.Start(ctx, tracePrefix+"Create")
 | |
| 	defer span.End()
 | |
| 
 | |
| 	guid := uuid.New().String()
 | |
| 	folder := ""
 | |
| 	if event.Object != nil {
 | |
| 		folder = event.Object.GetFolder()
 | |
| 	}
 | |
| 
 | |
| 	rv, err := b.rvManager.ExecWithRV(ctx, event.Key, func(tx db.Tx) (string, error) {
 | |
| 		// 1. Insert into resource
 | |
| 		if _, err := dbutil.Exec(ctx, tx, sqlResourceInsert, sqlResourceRequest{
 | |
| 			SQLTemplate: sqltemplate.New(b.dialect),
 | |
| 			WriteEvent:  event,
 | |
| 			Folder:      folder,
 | |
| 			GUID:        guid,
 | |
| 		}); err != nil {
 | |
| 			if isRowAlreadyExistsError(err) {
 | |
| 				return guid, resource.ErrResourceAlreadyExists
 | |
| 			}
 | |
| 			return guid, fmt.Errorf("insert into resource: %w", err)
 | |
| 		}
 | |
| 
 | |
| 		// 2. Insert into resource history
 | |
| 		if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{
 | |
| 			SQLTemplate: sqltemplate.New(b.dialect),
 | |
| 			WriteEvent:  event,
 | |
| 			Folder:      folder,
 | |
| 			Generation:  event.Object.GetGeneration(),
 | |
| 			GUID:        guid,
 | |
| 		}); err != nil {
 | |
| 			return guid, fmt.Errorf("insert into resource history: %w", err)
 | |
| 		}
 | |
| 		_ = b.historyPruner.Add(pruningKey{
 | |
| 			namespace: event.Key.Namespace,
 | |
| 			group:     event.Key.Group,
 | |
| 			resource:  event.Key.Resource,
 | |
| 			name:      event.Key.Name,
 | |
| 		})
 | |
| 		if b.simulatedNetworkLatency > 0 {
 | |
| 			time.Sleep(b.simulatedNetworkLatency)
 | |
| 		}
 | |
| 		return guid, nil
 | |
| 	})
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 
 | |
| 	b.notifier.send(ctx, &resource.WrittenEvent{
 | |
| 		Type:            event.Type,
 | |
| 		Key:             event.Key,
 | |
| 		PreviousRV:      event.PreviousRV,
 | |
| 		Value:           event.Value,
 | |
| 		ResourceVersion: rv,
 | |
| 		Folder:          folder,
 | |
| 	})
 | |
| 
 | |
| 	return rv, nil
 | |
| }
 | |
| 
 | |
| // isRowAlreadyExistsError checks if the error is the result of the row inserted already existing.
 | |
| func isRowAlreadyExistsError(err error) bool {
 | |
| 	var sqlite sqlite3.Error
 | |
| 	if errors.As(err, &sqlite) {
 | |
| 		return sqlite.ExtendedCode == sqlite3.ErrConstraintUnique
 | |
| 	}
 | |
| 
 | |
| 	var pg *pgconn.PgError
 | |
| 	if errors.As(err, &pg) {
 | |
| 		// https://www.postgresql.org/docs/current/errcodes-appendix.html
 | |
| 		return pg.Code == "23505" // unique_violation
 | |
| 	}
 | |
| 
 | |
| 	var pqerr *pq.Error
 | |
| 	if errors.As(err, &pqerr) {
 | |
| 		// https://www.postgresql.org/docs/current/errcodes-appendix.html
 | |
| 		return pqerr.Code == "23505" // unique_violation
 | |
| 	}
 | |
| 
 | |
| 	var mysqlerr *mysql.MySQLError
 | |
| 	if errors.As(err, &mysqlerr) {
 | |
| 		// https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html
 | |
| 		return mysqlerr.Number == 1062 // ER_DUP_ENTRY
 | |
| 	}
 | |
| 
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| func (b *backend) update(ctx context.Context, event resource.WriteEvent) (int64, error) {
 | |
| 	ctx, span := b.tracer.Start(ctx, tracePrefix+"Update")
 | |
| 	defer span.End()
 | |
| 	guid := uuid.New().String()
 | |
| 	folder := ""
 | |
| 	if event.Object != nil {
 | |
| 		folder = event.Object.GetFolder()
 | |
| 	}
 | |
| 
 | |
| 	// Use rvManager.ExecWithRV instead of direct transaction
 | |
| 	rv, err := b.rvManager.ExecWithRV(ctx, event.Key, func(tx db.Tx) (string, error) {
 | |
| 		// 1. Update resource
 | |
| 		_, err := dbutil.Exec(ctx, tx, sqlResourceUpdate, sqlResourceRequest{
 | |
| 			SQLTemplate: sqltemplate.New(b.dialect),
 | |
| 			WriteEvent:  event,
 | |
| 			Folder:      folder,
 | |
| 			GUID:        guid,
 | |
| 		})
 | |
| 		if err != nil {
 | |
| 			return guid, fmt.Errorf("resource update: %w", err)
 | |
| 		}
 | |
| 
 | |
| 		// 2. Insert into resource history
 | |
| 		if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{
 | |
| 			SQLTemplate: sqltemplate.New(b.dialect),
 | |
| 			WriteEvent:  event,
 | |
| 			Folder:      folder,
 | |
| 			GUID:        guid,
 | |
| 			Generation:  event.Object.GetGeneration(),
 | |
| 		}); err != nil {
 | |
| 			return guid, fmt.Errorf("insert into resource history: %w", err)
 | |
| 		}
 | |
| 		_ = b.historyPruner.Add(pruningKey{
 | |
| 			namespace: event.Key.Namespace,
 | |
| 			group:     event.Key.Group,
 | |
| 			resource:  event.Key.Resource,
 | |
| 			name:      event.Key.Name,
 | |
| 		})
 | |
| 		return guid, nil
 | |
| 	})
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 
 | |
| 	b.notifier.send(ctx, &resource.WrittenEvent{
 | |
| 		Type:            event.Type,
 | |
| 		Key:             event.Key,
 | |
| 		PreviousRV:      event.PreviousRV,
 | |
| 		Value:           event.Value,
 | |
| 		ResourceVersion: rv,
 | |
| 		Folder:          folder,
 | |
| 	})
 | |
| 
 | |
| 	return rv, nil
 | |
| }
 | |
| 
 | |
| func (b *backend) delete(ctx context.Context, event resource.WriteEvent) (int64, error) {
 | |
| 	ctx, span := b.tracer.Start(ctx, tracePrefix+"Delete")
 | |
| 	defer span.End()
 | |
| 	guid := uuid.New().String()
 | |
| 	folder := ""
 | |
| 	if event.Object != nil {
 | |
| 		folder = event.Object.GetFolder()
 | |
| 	}
 | |
| 	rv, err := b.rvManager.ExecWithRV(ctx, event.Key, func(tx db.Tx) (string, error) {
 | |
| 		// 1. delete from resource
 | |
| 		_, err := dbutil.Exec(ctx, tx, sqlResourceDelete, sqlResourceRequest{
 | |
| 			SQLTemplate: sqltemplate.New(b.dialect),
 | |
| 			WriteEvent:  event,
 | |
| 			GUID:        guid,
 | |
| 		})
 | |
| 		if err != nil {
 | |
| 			return guid, fmt.Errorf("delete resource: %w", err)
 | |
| 		}
 | |
| 
 | |
| 		// 2. Add event to resource history
 | |
| 		if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{
 | |
| 			SQLTemplate: sqltemplate.New(b.dialect),
 | |
| 			WriteEvent:  event,
 | |
| 			Folder:      folder,
 | |
| 			GUID:        guid,
 | |
| 			Generation:  0, // object does not exist
 | |
| 		}); err != nil {
 | |
| 			return guid, fmt.Errorf("insert into resource history: %w", err)
 | |
| 		}
 | |
| 		_ = b.historyPruner.Add(pruningKey{
 | |
| 			namespace: event.Key.Namespace,
 | |
| 			group:     event.Key.Group,
 | |
| 			resource:  event.Key.Resource,
 | |
| 			name:      event.Key.Name,
 | |
| 		})
 | |
| 		return guid, nil
 | |
| 	})
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 
 | |
| 	b.notifier.send(ctx, &resource.WrittenEvent{
 | |
| 		Type:            event.Type,
 | |
| 		Key:             event.Key,
 | |
| 		PreviousRV:      event.PreviousRV,
 | |
| 		Value:           event.Value,
 | |
| 		ResourceVersion: rv,
 | |
| 		Folder:          folder,
 | |
| 	})
 | |
| 
 | |
| 	return rv, nil
 | |
| }
 | |
| 
 | |
| func (b *backend) ReadResource(ctx context.Context, req *resource.ReadRequest) *resource.BackendReadResponse {
 | |
| 	_, span := b.tracer.Start(ctx, tracePrefix+".Read")
 | |
| 	defer span.End()
 | |
| 
 | |
| 	// TODO: validate key ?
 | |
| 
 | |
| 	if req.ResourceVersion > 0 {
 | |
| 		return b.readHistory(ctx, req.Key, req.ResourceVersion)
 | |
| 	}
 | |
| 
 | |
| 	readReq := &sqlResourceReadRequest{
 | |
| 		SQLTemplate: sqltemplate.New(b.dialect),
 | |
| 		Request:     req,
 | |
| 		Response:    NewReadResponse(),
 | |
| 	}
 | |
| 	var res *resource.BackendReadResponse
 | |
| 	err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error {
 | |
| 		var err error
 | |
| 		res, err = dbutil.QueryRow(ctx, tx, sqlResourceRead, readReq)
 | |
| 		return err
 | |
| 	})
 | |
| 
 | |
| 	if errors.Is(err, sql.ErrNoRows) {
 | |
| 		return &resource.BackendReadResponse{
 | |
| 			Error: resource.NewNotFoundError(req.Key),
 | |
| 		}
 | |
| 	} else if err != nil {
 | |
| 		return &resource.BackendReadResponse{Error: resource.AsErrorResult(err)}
 | |
| 	}
 | |
| 
 | |
| 	return res
 | |
| }
 | |
| 
 | |
| func (b *backend) ListIterator(ctx context.Context, req *resource.ListRequest, cb func(resource.ListIterator) error) (int64, error) {
 | |
| 	ctx, span := b.tracer.Start(ctx, tracePrefix+"List")
 | |
| 	defer span.End()
 | |
| 
 | |
| 	if err := resource.MigrateListRequestVersionMatch(req, b.log); err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 
 | |
| 	if req.Options == nil || req.Options.Key.Group == "" || req.Options.Key.Resource == "" {
 | |
| 		return 0, fmt.Errorf("missing group or resource")
 | |
| 	}
 | |
| 
 | |
| 	if req.Source != resource.ListRequest_STORE {
 | |
| 		return b.getHistory(ctx, req, cb)
 | |
| 	}
 | |
| 
 | |
| 	// TODO: think about how to handler VersionMatch. We should be able to use latest for the first page (only).
 | |
| 
 | |
| 	// TODO: add support for RemainingItemCount
 | |
| 
 | |
| 	if req.ResourceVersion > 0 || req.NextPageToken != "" {
 | |
| 		return b.listAtRevision(ctx, req, cb)
 | |
| 	}
 | |
| 	return b.listLatest(ctx, req, cb)
 | |
| }
 | |
| 
 | |
| type listIter struct {
 | |
| 	rows    db.Rows
 | |
| 	offset  int64
 | |
| 	listRV  int64
 | |
| 	sortAsc bool
 | |
| 
 | |
| 	// any error
 | |
| 	err error
 | |
| 
 | |
| 	// The row
 | |
| 	guid      string
 | |
| 	rv        int64
 | |
| 	value     []byte
 | |
| 	namespace string
 | |
| 	resource  string
 | |
| 	group     string
 | |
| 	name      string
 | |
| 	folder    string
 | |
| }
 | |
| 
 | |
| // ContinueToken implements resource.ListIterator.
 | |
| func (l *listIter) ContinueToken() string {
 | |
| 	return resource.ContinueToken{ResourceVersion: l.listRV, StartOffset: l.offset, SortAscending: l.sortAsc}.String()
 | |
| }
 | |
| 
 | |
| func (l *listIter) ContinueTokenWithCurrentRV() string {
 | |
| 	return resource.ContinueToken{ResourceVersion: l.rv, StartOffset: l.offset, SortAscending: l.sortAsc}.String()
 | |
| }
 | |
| 
 | |
| func (l *listIter) Error() error {
 | |
| 	return l.err
 | |
| }
 | |
| 
 | |
| func (l *listIter) Name() string {
 | |
| 	return l.name
 | |
| }
 | |
| 
 | |
| func (l *listIter) Namespace() string {
 | |
| 	return l.namespace
 | |
| }
 | |
| 
 | |
| func (l *listIter) Folder() string {
 | |
| 	return l.folder
 | |
| }
 | |
| 
 | |
| // ResourceVersion implements resource.ListIterator.
 | |
| func (l *listIter) ResourceVersion() int64 {
 | |
| 	return l.rv
 | |
| }
 | |
| 
 | |
| // Value implements resource.ListIterator.
 | |
| func (l *listIter) Value() []byte {
 | |
| 	return l.value
 | |
| }
 | |
| 
 | |
| // Next implements resource.ListIterator.
 | |
| func (l *listIter) Next() bool {
 | |
| 	if l.rows.Next() {
 | |
| 		l.offset++
 | |
| 		l.err = l.rows.Scan(&l.guid, &l.rv, &l.namespace, &l.resource, &l.group, &l.name, &l.folder, &l.value)
 | |
| 		return true
 | |
| 	}
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| var _ resource.ListIterator = (*listIter)(nil)
 | |
| 
 | |
| // listLatest fetches the resources from the resource table.
 | |
| func (b *backend) listLatest(ctx context.Context, req *resource.ListRequest, cb func(resource.ListIterator) error) (int64, error) {
 | |
| 	ctx, span := b.tracer.Start(ctx, tracePrefix+"listLatest")
 | |
| 	defer span.End()
 | |
| 
 | |
| 	if req.NextPageToken != "" {
 | |
| 		return 0, fmt.Errorf("only works for the first page")
 | |
| 	}
 | |
| 	if req.ResourceVersion > 0 {
 | |
| 		return 0, fmt.Errorf("only works for the 'latest' resource version")
 | |
| 	}
 | |
| 
 | |
| 	iter := &listIter{sortAsc: false}
 | |
| 	err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error {
 | |
| 		var err error
 | |
| 		iter.listRV, err = b.fetchLatestRV(ctx, tx, b.dialect, req.Options.Key.Group, req.Options.Key.Resource)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		listReq := sqlResourceListRequest{
 | |
| 			SQLTemplate: sqltemplate.New(b.dialect),
 | |
| 			Request:     new(resource.ListRequest),
 | |
| 		}
 | |
| 		listReq.Request = proto.Clone(req).(*resource.ListRequest)
 | |
| 
 | |
| 		rows, err := dbutil.QueryRows(ctx, tx, sqlResourceList, listReq)
 | |
| 		if rows != nil {
 | |
| 			defer func() {
 | |
| 				if err := rows.Close(); err != nil {
 | |
| 					b.log.Warn("listLatest error closing rows", "error", err)
 | |
| 				}
 | |
| 			}()
 | |
| 		}
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		iter.rows = rows
 | |
| 		return cb(iter)
 | |
| 	})
 | |
| 	return iter.listRV, err
 | |
| }
 | |
| 
 | |
| // listAtRevision fetches the resources from the resource_history table at a specific revision.
 | |
| func (b *backend) listAtRevision(ctx context.Context, req *resource.ListRequest, cb func(resource.ListIterator) error) (int64, error) {
 | |
| 	ctx, span := b.tracer.Start(ctx, tracePrefix+"listAtRevision")
 | |
| 	defer span.End()
 | |
| 
 | |
| 	// Get the RV
 | |
| 	iter := &listIter{listRV: req.ResourceVersion, sortAsc: false}
 | |
| 	if req.NextPageToken != "" {
 | |
| 		continueToken, err := resource.GetContinueToken(req.NextPageToken)
 | |
| 		if err != nil {
 | |
| 			return 0, fmt.Errorf("get continue token: %w", err)
 | |
| 		}
 | |
| 		iter.listRV = continueToken.ResourceVersion
 | |
| 		iter.offset = continueToken.StartOffset
 | |
| 
 | |
| 		if req.ResourceVersion != 0 && req.ResourceVersion != iter.listRV {
 | |
| 			return 0, apierrors.NewBadRequest("request resource version does not math token")
 | |
| 		}
 | |
| 	}
 | |
| 	if iter.listRV < 1 {
 | |
| 		return 0, apierrors.NewBadRequest("expecting an explicit resource version query")
 | |
| 	}
 | |
| 
 | |
| 	// The query below has the potential to be EXTREMELY slow if the resource_history table is big. May be helpful to know
 | |
| 	// which stack is calling this.
 | |
| 	b.log.Debug("listAtRevision", "ns", req.Options.Key.Namespace, "group", req.Options.Key.Group, "resource", req.Options.Key.Resource, "rv", iter.listRV)
 | |
| 
 | |
| 	err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error {
 | |
| 		limit := int64(0) // ignore limit
 | |
| 		if iter.offset > 0 {
 | |
| 			limit = math.MaxInt64 // a limit is required for offset
 | |
| 		}
 | |
| 		listReq := sqlResourceHistoryListRequest{
 | |
| 			SQLTemplate: sqltemplate.New(b.dialect),
 | |
| 			Request: &historyListRequest{
 | |
| 				ResourceVersion: iter.listRV,
 | |
| 				Limit:           limit,
 | |
| 				Offset:          iter.offset,
 | |
| 				Options:         req.Options,
 | |
| 			},
 | |
| 		}
 | |
| 
 | |
| 		rows, err := dbutil.QueryRows(ctx, tx, sqlResourceHistoryList, listReq)
 | |
| 		if rows != nil {
 | |
| 			defer func() {
 | |
| 				if err := rows.Close(); err != nil {
 | |
| 					b.log.Warn("listAtRevision error closing rows", "error", err)
 | |
| 				}
 | |
| 			}()
 | |
| 		}
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		iter.rows = rows
 | |
| 		return cb(iter)
 | |
| 	})
 | |
| 	return iter.listRV, err
 | |
| }
 | |
| 
 | |
| // readHistory fetches the resource history from the resource_history table.
 | |
| func (b *backend) readHistory(ctx context.Context, key *resource.ResourceKey, rv int64) *resource.BackendReadResponse {
 | |
| 	_, span := b.tracer.Start(ctx, tracePrefix+".ReadHistory")
 | |
| 	defer span.End()
 | |
| 
 | |
| 	readReq := &sqlResourceHistoryReadRequest{
 | |
| 		SQLTemplate: sqltemplate.New(b.dialect),
 | |
| 		Request: &historyReadRequest{
 | |
| 			Key:             key,
 | |
| 			ResourceVersion: rv,
 | |
| 		},
 | |
| 		Response: NewReadResponse(),
 | |
| 	}
 | |
| 
 | |
| 	var res *resource.BackendReadResponse
 | |
| 	err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error {
 | |
| 		var err error
 | |
| 		res, err = dbutil.QueryRow(ctx, tx, sqlResourceHistoryRead, readReq)
 | |
| 		return err
 | |
| 	})
 | |
| 
 | |
| 	if errors.Is(err, sql.ErrNoRows) {
 | |
| 		return &resource.BackendReadResponse{Error: resource.NewNotFoundError(key)}
 | |
| 	}
 | |
| 	if err != nil {
 | |
| 		return &resource.BackendReadResponse{Error: resource.AsErrorResult(err)}
 | |
| 	}
 | |
| 
 | |
| 	return res
 | |
| }
 | |
| 
 | |
| // getHistory fetches the resource history from the resource_history table.
 | |
| func (b *backend) getHistory(ctx context.Context, req *resource.ListRequest, cb func(resource.ListIterator) error) (int64, error) {
 | |
| 	ctx, span := b.tracer.Start(ctx, tracePrefix+"getHistory")
 | |
| 	defer span.End()
 | |
| 	listReq := sqlGetHistoryRequest{
 | |
| 		SQLTemplate: sqltemplate.New(b.dialect),
 | |
| 		Key:         req.Options.Key,
 | |
| 		Trash:       req.Source == resource.ListRequest_TRASH,
 | |
| 	}
 | |
| 
 | |
| 	// We are assuming that users want history in ascending order
 | |
| 	// when they are using NotOlderThan matching, and descending order
 | |
| 	// for Unset (default) and Exact matching.
 | |
| 	listReq.SortAscending = req.GetVersionMatchV2() == resource.ResourceVersionMatchV2_NotOlderThan
 | |
| 
 | |
| 	iter := &listIter{}
 | |
| 	if req.NextPageToken != "" {
 | |
| 		continueToken, err := resource.GetContinueToken(req.NextPageToken)
 | |
| 		if err != nil {
 | |
| 			return 0, fmt.Errorf("get continue token: %w", err)
 | |
| 		}
 | |
| 		listReq.StartRV = continueToken.ResourceVersion
 | |
| 		listReq.SortAscending = continueToken.SortAscending
 | |
| 	}
 | |
| 	iter.sortAsc = listReq.SortAscending
 | |
| 
 | |
| 	// Set ExactRV when using Exact matching
 | |
| 	if req.VersionMatchV2 == resource.ResourceVersionMatchV2_Exact {
 | |
| 		if req.ResourceVersion <= 0 {
 | |
| 			return 0, fmt.Errorf("expecting an explicit resource version query when using Exact matching")
 | |
| 		}
 | |
| 		listReq.ExactRV = req.ResourceVersion
 | |
| 	}
 | |
| 
 | |
| 	// Set MinRV when using NotOlderThan matching to filter at the database level
 | |
| 	if req.ResourceVersion > 0 && req.VersionMatchV2 == resource.ResourceVersionMatchV2_NotOlderThan {
 | |
| 		listReq.MinRV = req.ResourceVersion
 | |
| 	}
 | |
| 
 | |
| 	// Ignore last deleted history record when listing the trash, using exact matching or not older than matching with a specific RV
 | |
| 	useLatestDeletionAsMinRV := listReq.MinRV == 0 && !listReq.Trash && req.VersionMatchV2 != resource.ResourceVersionMatchV2_Exact
 | |
| 
 | |
| 	err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error {
 | |
| 		var err error
 | |
| 		iter.listRV, err = b.fetchLatestRV(ctx, tx, b.dialect, req.Options.Key.Group, req.Options.Key.Resource)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		if useLatestDeletionAsMinRV {
 | |
| 			latestDeletedRV, err := b.fetchLatestHistoryRV(ctx, tx, b.dialect, req.Options.Key, resource.WatchEvent_DELETED)
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 			listReq.MinRV = latestDeletedRV + 1
 | |
| 		}
 | |
| 
 | |
| 		rows, err := dbutil.QueryRows(ctx, tx, sqlResourceHistoryGet, listReq)
 | |
| 		if rows != nil {
 | |
| 			defer func() {
 | |
| 				if err := rows.Close(); err != nil {
 | |
| 					b.log.Warn("listLatest error closing rows", "error", err)
 | |
| 				}
 | |
| 			}()
 | |
| 		}
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		iter.rows = rows
 | |
| 		return cb(iter)
 | |
| 	})
 | |
| 	return iter.listRV, err
 | |
| }
 | |
| 
 | |
| func (b *backend) WatchWriteEvents(ctx context.Context) (<-chan *resource.WrittenEvent, error) {
 | |
| 	return b.notifier.notify(ctx)
 | |
| }
 | |
| 
 | |
| // listLatestRVs returns the latest resource version for each (Group, Resource) pair.
 | |
| func (b *backend) listLatestRVs(ctx context.Context) (groupResourceRV, error) {
 | |
| 	ctx, span := b.tracer.Start(ctx, tracePrefix+"listLatestRVs")
 | |
| 	defer span.End()
 | |
| 	var grvs []*groupResourceVersion
 | |
| 	err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error {
 | |
| 		var err error
 | |
| 		grvs, err = dbutil.Query(ctx, tx, sqlResourceVersionList, &sqlResourceVersionListRequest{
 | |
| 			SQLTemplate:          sqltemplate.New(b.dialect),
 | |
| 			groupResourceVersion: new(groupResourceVersion),
 | |
| 		})
 | |
| 
 | |
| 		return err
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	since := groupResourceRV{}
 | |
| 	for _, grv := range grvs {
 | |
| 		if since[grv.Group] == nil {
 | |
| 			since[grv.Group] = map[string]int64{}
 | |
| 		}
 | |
| 		since[grv.Group][grv.Resource] = grv.ResourceVersion
 | |
| 	}
 | |
| 
 | |
| 	return since, nil
 | |
| }
 | |
| 
 | |
| // fetchLatestRV returns the current maximum RV in the resource table
 | |
| func (b *backend) fetchLatestRV(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialect, group, resource string) (int64, error) {
 | |
| 	ctx, span := b.tracer.Start(ctx, tracePrefix+"fetchLatestRV")
 | |
| 	defer span.End()
 | |
| 	res, err := dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionGetRequest{
 | |
| 		SQLTemplate: sqltemplate.New(d),
 | |
| 		Group:       group,
 | |
| 		Resource:    resource,
 | |
| 		ReadOnly:    true,
 | |
| 		Response:    new(resourceVersionResponse),
 | |
| 	})
 | |
| 	if errors.Is(err, sql.ErrNoRows) {
 | |
| 		return 1, nil
 | |
| 	} else if err != nil {
 | |
| 		return 0, fmt.Errorf("get resource version: %w", err)
 | |
| 	}
 | |
| 	return res.ResourceVersion, nil
 | |
| }
 | |
| 
 | |
| // fetchLatestHistoryRV returns the current maximum RV in the resource_history table
 | |
| func (b *backend) fetchLatestHistoryRV(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialect, key *resource.ResourceKey, eventType resource.WatchEvent_Type) (int64, error) {
 | |
| 	ctx, span := b.tracer.Start(ctx, tracePrefix+"fetchLatestHistoryRV")
 | |
| 	defer span.End()
 | |
| 	res, err := dbutil.QueryRow(ctx, x, sqlResourceHistoryReadLatestRV, sqlResourceHistoryReadLatestRVRequest{
 | |
| 		SQLTemplate: sqltemplate.New(d),
 | |
| 		Request: &historyReadLatestRVRequest{
 | |
| 			Key:       key,
 | |
| 			EventType: eventType,
 | |
| 		},
 | |
| 		Response: new(resourceHistoryReadLatestRVResponse),
 | |
| 	})
 | |
| 	if errors.Is(err, sql.ErrNoRows) {
 | |
| 		return 0, nil
 | |
| 	} else if err != nil {
 | |
| 		return 0, fmt.Errorf("get resource version: %w", err)
 | |
| 	}
 | |
| 	return res.ResourceVersion, nil
 | |
| }
 |