| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | package sql | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"context" | 
					
						
							|  |  |  | 	"database/sql" | 
					
						
							|  |  |  | 	"errors" | 
					
						
							|  |  |  | 	"fmt" | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 	"math" | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 	"sync" | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	"time" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/google/uuid" | 
					
						
							| 
									
										
										
										
											2024-10-18 11:32:08 +08:00
										 |  |  | 	"go.opentelemetry.io/otel/attribute" | 
					
						
							|  |  |  | 	semconv "go.opentelemetry.io/otel/semconv/v1.26.0" | 
					
						
							| 
									
										
										
										
											2024-10-10 04:32:09 +08:00
										 |  |  | 	"go.opentelemetry.io/otel/trace" | 
					
						
							|  |  |  | 	"go.opentelemetry.io/otel/trace/noop" | 
					
						
							|  |  |  | 	"google.golang.org/protobuf/proto" | 
					
						
							|  |  |  | 	apierrors "k8s.io/apimachinery/pkg/api/errors" | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/infra/log" | 
					
						
							|  |  |  | 	"github.com/grafana/grafana/pkg/storage/unified/resource" | 
					
						
							|  |  |  | 	"github.com/grafana/grafana/pkg/storage/unified/sql/db" | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/storage/unified/sql/dbutil" | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-10-10 04:32:09 +08:00
										 |  |  | const tracePrefix = "sql.resource." | 
					
						
							| 
									
										
										
										
											2024-10-07 16:01:53 +08:00
										 |  |  | const defaultPollingInterval = 100 * time.Millisecond | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | type Backend interface { | 
					
						
							|  |  |  | 	resource.StorageBackend | 
					
						
							|  |  |  | 	resource.DiagnosticsServer | 
					
						
							|  |  |  | 	resource.LifecycleHooks | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | type BackendOptions struct { | 
					
						
							| 
									
										
										
										
											2025-01-17 20:54:25 +08:00
										 |  |  | 	DBProvider        db.DBProvider | 
					
						
							|  |  |  | 	Tracer            trace.Tracer | 
					
						
							|  |  |  | 	PollingInterval   time.Duration | 
					
						
							|  |  |  | 	SkipDataMigration bool | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | } | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | func NewBackend(opts BackendOptions) (Backend, error) { | 
					
						
							|  |  |  | 	if opts.DBProvider == nil { | 
					
						
							|  |  |  | 		return nil, errors.New("no db provider") | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	if opts.Tracer == nil { | 
					
						
							|  |  |  | 		opts.Tracer = noop.NewTracerProvider().Tracer("sql-backend") | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 	ctx, cancel := context.WithCancel(context.Background()) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-10-07 16:01:53 +08:00
										 |  |  | 	pollingInterval := opts.PollingInterval | 
					
						
							|  |  |  | 	if pollingInterval == 0 { | 
					
						
							|  |  |  | 		pollingInterval = defaultPollingInterval | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	return &backend{ | 
					
						
							| 
									
										
										
										
											2025-01-17 20:54:25 +08:00
										 |  |  | 		done:              ctx.Done(), | 
					
						
							|  |  |  | 		cancel:            cancel, | 
					
						
							|  |  |  | 		log:               log.New("sql-resource-server"), | 
					
						
							|  |  |  | 		tracer:            opts.Tracer, | 
					
						
							|  |  |  | 		dbProvider:        opts.DBProvider, | 
					
						
							|  |  |  | 		pollingInterval:   pollingInterval, | 
					
						
							|  |  |  | 		skipDataMigration: opts.SkipDataMigration, | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	}, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type backend struct { | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 	// server lifecycle
 | 
					
						
							|  |  |  | 	done     <-chan struct{} | 
					
						
							|  |  |  | 	cancel   context.CancelFunc | 
					
						
							|  |  |  | 	initOnce sync.Once | 
					
						
							|  |  |  | 	initErr  error | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// o11y
 | 
					
						
							|  |  |  | 	log    log.Logger | 
					
						
							|  |  |  | 	tracer trace.Tracer | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// database
 | 
					
						
							| 
									
										
										
										
											2025-01-17 20:54:25 +08:00
										 |  |  | 	dbProvider        db.DBProvider | 
					
						
							|  |  |  | 	db                db.DB | 
					
						
							|  |  |  | 	dialect           sqltemplate.Dialect | 
					
						
							|  |  |  | 	skipDataMigration bool | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// watch streaming
 | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	//stream chan *resource.WatchEvent
 | 
					
						
							| 
									
										
										
										
											2024-10-07 16:01:53 +08:00
										 |  |  | 	pollingInterval time.Duration | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | func (b *backend) Init(ctx context.Context) error { | 
					
						
							|  |  |  | 	b.initOnce.Do(func() { | 
					
						
							|  |  |  | 		b.initErr = b.initLocked(ctx) | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | 	return b.initErr | 
					
						
							|  |  |  | } | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | func (b *backend) initLocked(ctx context.Context) error { | 
					
						
							|  |  |  | 	db, err := b.dbProvider.Init(ctx) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 		return fmt.Errorf("initialize resource DB: %w", err) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 	b.db = db | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 	driverName := db.DriverName() | 
					
						
							|  |  |  | 	b.dialect = sqltemplate.DialectForDriver(driverName) | 
					
						
							|  |  |  | 	if b.dialect == nil { | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		return fmt.Errorf("no dialect for driver %q", driverName) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-01-17 20:54:25 +08:00
										 |  |  | 	// Process any data manipulation migrations
 | 
					
						
							|  |  |  | 	err = b.runStartupDataMigrations(ctx) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 	return b.db.PingContext(ctx) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (b *backend) IsHealthy(ctx context.Context, r *resource.HealthCheckRequest) (*resource.HealthCheckResponse, error) { | 
					
						
							|  |  |  | 	// ctxLogger := s.log.FromContext(log.WithContextualAttributes(ctx, []any{"method", "isHealthy"}))
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 	if err := b.db.PingContext(ctx); err != nil { | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	return &resource.HealthCheckResponse{Status: resource.HealthCheckResponse_SERVING}, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | func (b *backend) Stop(_ context.Context) error { | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	b.cancel() | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 	return nil | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-12-04 01:20:27 +08:00
										 |  |  | // GetResourceStats implements Backend.
 | 
					
						
							| 
									
										
										
										
											2024-12-05 18:58:13 +08:00
										 |  |  | func (b *backend) GetResourceStats(ctx context.Context, namespace string, minCount int) ([]resource.ResourceStats, error) { | 
					
						
							| 
									
										
										
										
											2024-12-10 12:32:19 +08:00
										 |  |  | 	ctx, span := b.tracer.Start(ctx, tracePrefix+".GetResourceStats") | 
					
						
							| 
									
										
										
										
											2024-12-04 01:20:27 +08:00
										 |  |  | 	defer span.End() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	req := &sqlStatsRequest{ | 
					
						
							|  |  |  | 		SQLTemplate: sqltemplate.New(b.dialect), | 
					
						
							| 
									
										
										
										
											2024-12-05 18:58:13 +08:00
										 |  |  | 		Namespace:   namespace, | 
					
						
							| 
									
										
										
										
											2024-12-04 01:20:27 +08:00
										 |  |  | 		MinCount:    minCount, // not used in query... yet?
 | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	res := make([]resource.ResourceStats, 0, 100) | 
					
						
							|  |  |  | 	err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { | 
					
						
							|  |  |  | 		rows, err := dbutil.QueryRows(ctx, tx, sqlResourceStats, req) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		for rows.Next() { | 
					
						
							|  |  |  | 			row := resource.ResourceStats{} | 
					
						
							|  |  |  | 			err = rows.Scan(&row.Namespace, &row.Group, &row.Resource, &row.Count, &row.ResourceVersion) | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				return err | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			if row.Count > int64(minCount) { | 
					
						
							|  |  |  | 				res = append(res, row) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return res, err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | func (b *backend) WriteEvent(ctx context.Context, event resource.WriteEvent) (int64, error) { | 
					
						
							| 
									
										
										
										
											2024-10-10 04:32:09 +08:00
										 |  |  | 	_, span := b.tracer.Start(ctx, tracePrefix+"WriteEvent") | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	defer span.End() | 
					
						
							|  |  |  | 	// TODO: validate key ?
 | 
					
						
							|  |  |  | 	switch event.Type { | 
					
						
							|  |  |  | 	case resource.WatchEvent_ADDED: | 
					
						
							| 
									
										
										
										
											2024-12-14 06:55:43 +08:00
										 |  |  | 		if event.ObjectOld != nil { | 
					
						
							|  |  |  | 			return b.restore(ctx, event) | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		return b.create(ctx, event) | 
					
						
							|  |  |  | 	case resource.WatchEvent_MODIFIED: | 
					
						
							|  |  |  | 		return b.update(ctx, event) | 
					
						
							|  |  |  | 	case resource.WatchEvent_DELETED: | 
					
						
							|  |  |  | 		return b.delete(ctx, event) | 
					
						
							|  |  |  | 	default: | 
					
						
							|  |  |  | 		return 0, fmt.Errorf("unsupported event type") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64, error) { | 
					
						
							| 
									
										
										
										
											2024-10-10 04:32:09 +08:00
										 |  |  | 	ctx, span := b.tracer.Start(ctx, tracePrefix+"Create") | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	defer span.End() | 
					
						
							|  |  |  | 	var newVersion int64 | 
					
						
							|  |  |  | 	guid := uuid.New().String() | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 	err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error { | 
					
						
							| 
									
										
										
										
											2024-11-12 19:52:04 +08:00
										 |  |  | 		folder := "" | 
					
						
							|  |  |  | 		if event.Object != nil { | 
					
						
							|  |  |  | 			folder = event.Object.GetFolder() | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		// 1. Insert into resource
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 		if _, err := dbutil.Exec(ctx, tx, sqlResourceInsert, sqlResourceRequest{ | 
					
						
							| 
									
										
										
										
											2024-08-16 20:12:37 +08:00
										 |  |  | 			SQLTemplate: sqltemplate.New(b.dialect), | 
					
						
							|  |  |  | 			WriteEvent:  event, | 
					
						
							| 
									
										
										
										
											2024-11-12 19:52:04 +08:00
										 |  |  | 			Folder:      folder, | 
					
						
							| 
									
										
										
										
											2024-08-16 20:12:37 +08:00
										 |  |  | 			GUID:        guid, | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		}); err != nil { | 
					
						
							|  |  |  | 			return fmt.Errorf("insert into resource: %w", err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// 2. Insert into resource history
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 		if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{ | 
					
						
							| 
									
										
										
										
											2024-08-16 20:12:37 +08:00
										 |  |  | 			SQLTemplate: sqltemplate.New(b.dialect), | 
					
						
							|  |  |  | 			WriteEvent:  event, | 
					
						
							| 
									
										
										
										
											2024-11-12 19:52:04 +08:00
										 |  |  | 			Folder:      folder, | 
					
						
							| 
									
										
										
										
											2024-08-16 20:12:37 +08:00
										 |  |  | 			GUID:        guid, | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		}); err != nil { | 
					
						
							|  |  |  | 			return fmt.Errorf("insert into resource history: %w", err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// 3. TODO: Rebuild the whole folder tree structure if we're creating a folder
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 		// 4. Atomically increment resource version for this kind
 | 
					
						
							| 
									
										
										
										
											2024-10-18 11:32:08 +08:00
										 |  |  | 		rv, err := b.resourceVersionAtomicInc(ctx, tx, event.Key) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		if err != nil { | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 			return fmt.Errorf("increment resource version: %w", err) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// 5. Update the RV in both resource and resource_history
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 		if _, err = dbutil.Exec(ctx, tx, sqlResourceHistoryUpdateRV, sqlResourceUpdateRVRequest{ | 
					
						
							| 
									
										
										
										
											2024-08-16 20:12:37 +08:00
										 |  |  | 			SQLTemplate:     sqltemplate.New(b.dialect), | 
					
						
							|  |  |  | 			GUID:            guid, | 
					
						
							|  |  |  | 			ResourceVersion: rv, | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		}); err != nil { | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 			return fmt.Errorf("update resource_history rv: %w", err) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 		if _, err = dbutil.Exec(ctx, tx, sqlResourceUpdateRV, sqlResourceUpdateRVRequest{ | 
					
						
							| 
									
										
										
										
											2024-08-16 20:12:37 +08:00
										 |  |  | 			SQLTemplate:     sqltemplate.New(b.dialect), | 
					
						
							|  |  |  | 			GUID:            guid, | 
					
						
							|  |  |  | 			ResourceVersion: rv, | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		}); err != nil { | 
					
						
							|  |  |  | 			return fmt.Errorf("update resource rv: %w", err) | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 		newVersion = rv | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		return nil | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | 	return newVersion, err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (b *backend) update(ctx context.Context, event resource.WriteEvent) (int64, error) { | 
					
						
							| 
									
										
										
										
											2024-10-10 04:32:09 +08:00
										 |  |  | 	ctx, span := b.tracer.Start(ctx, tracePrefix+"Update") | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	defer span.End() | 
					
						
							|  |  |  | 	var newVersion int64 | 
					
						
							|  |  |  | 	guid := uuid.New().String() | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 	err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error { | 
					
						
							| 
									
										
										
										
											2024-11-12 19:52:04 +08:00
										 |  |  | 		folder := "" | 
					
						
							|  |  |  | 		if event.Object != nil { | 
					
						
							|  |  |  | 			folder = event.Object.GetFolder() | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 		// 1. Update resource
 | 
					
						
							|  |  |  | 		_, err := dbutil.Exec(ctx, tx, sqlResourceUpdate, sqlResourceRequest{ | 
					
						
							| 
									
										
										
										
											2024-08-16 20:12:37 +08:00
										 |  |  | 			SQLTemplate: sqltemplate.New(b.dialect), | 
					
						
							|  |  |  | 			WriteEvent:  event, | 
					
						
							| 
									
										
										
										
											2024-11-12 19:52:04 +08:00
										 |  |  | 			Folder:      folder, | 
					
						
							| 
									
										
										
										
											2024-08-16 20:12:37 +08:00
										 |  |  | 			GUID:        guid, | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		}) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 			return fmt.Errorf("initial resource update: %w", err) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// 2. Insert into resource history
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 		if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{ | 
					
						
							| 
									
										
										
										
											2024-08-16 20:12:37 +08:00
										 |  |  | 			SQLTemplate: sqltemplate.New(b.dialect), | 
					
						
							|  |  |  | 			WriteEvent:  event, | 
					
						
							| 
									
										
										
										
											2024-11-12 19:52:04 +08:00
										 |  |  | 			Folder:      folder, | 
					
						
							| 
									
										
										
										
											2024-08-16 20:12:37 +08:00
										 |  |  | 			GUID:        guid, | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		}); err != nil { | 
					
						
							|  |  |  | 			return fmt.Errorf("insert into resource history: %w", err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// 3. TODO: Rebuild the whole folder tree structure if we're creating a folder
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 		// 4. Atomically increment resource version for this kind
 | 
					
						
							| 
									
										
										
										
											2024-10-18 11:32:08 +08:00
										 |  |  | 		rv, err := b.resourceVersionAtomicInc(ctx, tx, event.Key) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		if err != nil { | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 			return fmt.Errorf("increment resource version: %w", err) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// 5. Update the RV in both resource and resource_history
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 		if _, err = dbutil.Exec(ctx, tx, sqlResourceHistoryUpdateRV, sqlResourceUpdateRVRequest{ | 
					
						
							| 
									
										
										
										
											2024-08-16 20:12:37 +08:00
										 |  |  | 			SQLTemplate:     sqltemplate.New(b.dialect), | 
					
						
							|  |  |  | 			GUID:            guid, | 
					
						
							|  |  |  | 			ResourceVersion: rv, | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		}); err != nil { | 
					
						
							|  |  |  | 			return fmt.Errorf("update history rv: %w", err) | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 		if _, err = dbutil.Exec(ctx, tx, sqlResourceUpdateRV, sqlResourceUpdateRVRequest{ | 
					
						
							| 
									
										
										
										
											2024-08-16 20:12:37 +08:00
										 |  |  | 			SQLTemplate:     sqltemplate.New(b.dialect), | 
					
						
							|  |  |  | 			GUID:            guid, | 
					
						
							|  |  |  | 			ResourceVersion: rv, | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		}); err != nil { | 
					
						
							|  |  |  | 			return fmt.Errorf("update resource rv: %w", err) | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 		newVersion = rv | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 		return nil | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return newVersion, err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (b *backend) delete(ctx context.Context, event resource.WriteEvent) (int64, error) { | 
					
						
							| 
									
										
										
										
											2024-10-10 04:32:09 +08:00
										 |  |  | 	ctx, span := b.tracer.Start(ctx, tracePrefix+"Delete") | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	defer span.End() | 
					
						
							|  |  |  | 	var newVersion int64 | 
					
						
							|  |  |  | 	guid := uuid.New().String() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 	err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error { | 
					
						
							| 
									
										
										
										
											2024-11-12 19:52:04 +08:00
										 |  |  | 		folder := "" | 
					
						
							|  |  |  | 		if event.Object != nil { | 
					
						
							|  |  |  | 			folder = event.Object.GetFolder() | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		// 1. delete from resource
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 		_, err := dbutil.Exec(ctx, tx, sqlResourceDelete, sqlResourceRequest{ | 
					
						
							| 
									
										
										
										
											2024-08-16 20:12:37 +08:00
										 |  |  | 			SQLTemplate: sqltemplate.New(b.dialect), | 
					
						
							|  |  |  | 			WriteEvent:  event, | 
					
						
							|  |  |  | 			GUID:        guid, | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		}) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return fmt.Errorf("delete resource: %w", err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 		// 2. Add event to resource history
 | 
					
						
							|  |  |  | 		if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{ | 
					
						
							| 
									
										
										
										
											2024-08-16 20:12:37 +08:00
										 |  |  | 			SQLTemplate: sqltemplate.New(b.dialect), | 
					
						
							|  |  |  | 			WriteEvent:  event, | 
					
						
							| 
									
										
										
										
											2024-11-12 19:52:04 +08:00
										 |  |  | 			Folder:      folder, | 
					
						
							| 
									
										
										
										
											2024-08-16 20:12:37 +08:00
										 |  |  | 			GUID:        guid, | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		}); err != nil { | 
					
						
							|  |  |  | 			return fmt.Errorf("insert into resource history: %w", err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// 3. TODO: Rebuild the whole folder tree structure if we're creating a folder
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 		// 4. Atomically increment resource version for this kind
 | 
					
						
							| 
									
										
										
										
											2024-10-18 11:32:08 +08:00
										 |  |  | 		rv, err := b.resourceVersionAtomicInc(ctx, tx, event.Key) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		if err != nil { | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 			return fmt.Errorf("increment resource version: %w", err) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// 5. Update the RV in resource_history
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 		if _, err = dbutil.Exec(ctx, tx, sqlResourceHistoryUpdateRV, sqlResourceUpdateRVRequest{ | 
					
						
							| 
									
										
										
										
											2024-08-16 20:12:37 +08:00
										 |  |  | 			SQLTemplate:     sqltemplate.New(b.dialect), | 
					
						
							|  |  |  | 			GUID:            guid, | 
					
						
							|  |  |  | 			ResourceVersion: rv, | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		}); err != nil { | 
					
						
							|  |  |  | 			return fmt.Errorf("update history rv: %w", err) | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 		newVersion = rv | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		return nil | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return newVersion, err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-12-14 06:55:43 +08:00
										 |  |  | func (b *backend) restore(ctx context.Context, event resource.WriteEvent) (int64, error) { | 
					
						
							|  |  |  | 	ctx, span := b.tracer.Start(ctx, tracePrefix+"Restore") | 
					
						
							|  |  |  | 	defer span.End() | 
					
						
							|  |  |  | 	var newVersion int64 | 
					
						
							|  |  |  | 	guid := uuid.New().String() | 
					
						
							|  |  |  | 	err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error { | 
					
						
							|  |  |  | 		folder := "" | 
					
						
							|  |  |  | 		if event.Object != nil { | 
					
						
							|  |  |  | 			folder = event.Object.GetFolder() | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// 1. Re-create resource
 | 
					
						
							|  |  |  | 		// Note: we may want to replace the write event with a create event, tbd.
 | 
					
						
							|  |  |  | 		if _, err := dbutil.Exec(ctx, tx, sqlResourceInsert, sqlResourceRequest{ | 
					
						
							|  |  |  | 			SQLTemplate: sqltemplate.New(b.dialect), | 
					
						
							|  |  |  | 			WriteEvent:  event, | 
					
						
							|  |  |  | 			Folder:      folder, | 
					
						
							|  |  |  | 			GUID:        guid, | 
					
						
							|  |  |  | 		}); err != nil { | 
					
						
							|  |  |  | 			return fmt.Errorf("insert into resource: %w", err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// 2. Insert into resource history
 | 
					
						
							|  |  |  | 		if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{ | 
					
						
							|  |  |  | 			SQLTemplate: sqltemplate.New(b.dialect), | 
					
						
							|  |  |  | 			WriteEvent:  event, | 
					
						
							|  |  |  | 			Folder:      folder, | 
					
						
							|  |  |  | 			GUID:        guid, | 
					
						
							|  |  |  | 		}); err != nil { | 
					
						
							|  |  |  | 			return fmt.Errorf("insert into resource history: %w", err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// 3. TODO: Rebuild the whole folder tree structure if we're creating a folder
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// 4. Atomically increment resource version for this kind
 | 
					
						
							|  |  |  | 		rv, err := b.resourceVersionAtomicInc(ctx, tx, event.Key) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return fmt.Errorf("increment resource version: %w", err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// 5. Update the RV in both resource and resource_history
 | 
					
						
							|  |  |  | 		if _, err = dbutil.Exec(ctx, tx, sqlResourceHistoryUpdateRV, sqlResourceUpdateRVRequest{ | 
					
						
							|  |  |  | 			SQLTemplate:     sqltemplate.New(b.dialect), | 
					
						
							|  |  |  | 			GUID:            guid, | 
					
						
							|  |  |  | 			ResourceVersion: rv, | 
					
						
							|  |  |  | 		}); err != nil { | 
					
						
							|  |  |  | 			return fmt.Errorf("update history rv: %w", err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		if _, err = dbutil.Exec(ctx, tx, sqlResourceUpdateRV, sqlResourceUpdateRVRequest{ | 
					
						
							|  |  |  | 			SQLTemplate:     sqltemplate.New(b.dialect), | 
					
						
							|  |  |  | 			GUID:            guid, | 
					
						
							|  |  |  | 			ResourceVersion: rv, | 
					
						
							|  |  |  | 		}); err != nil { | 
					
						
							|  |  |  | 			return fmt.Errorf("update resource rv: %w", err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// 6. Update all resource history entries with the new UID
 | 
					
						
							|  |  |  | 		// Note: we do not update any history entries that have a deletion timestamp included. This will become
 | 
					
						
							|  |  |  | 		// important once we start using finalizers, as the initial delete will show up as an update with a deletion timestamp included.
 | 
					
						
							|  |  |  | 		if _, err = dbutil.Exec(ctx, tx, sqlResoureceHistoryUpdateUid, sqlResourceHistoryUpdateRequest{ | 
					
						
							|  |  |  | 			SQLTemplate: sqltemplate.New(b.dialect), | 
					
						
							|  |  |  | 			WriteEvent:  event, | 
					
						
							|  |  |  | 			OldUID:      string(event.ObjectOld.GetUID()), | 
					
						
							|  |  |  | 			NewUID:      string(event.Object.GetUID()), | 
					
						
							|  |  |  | 		}); err != nil { | 
					
						
							|  |  |  | 			return fmt.Errorf("update history uid: %w", err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		newVersion = rv | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		return nil | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return newVersion, err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-12 19:52:04 +08:00
										 |  |  | func (b *backend) ReadResource(ctx context.Context, req *resource.ReadRequest) *resource.BackendReadResponse { | 
					
						
							| 
									
										
										
										
											2024-10-10 04:32:09 +08:00
										 |  |  | 	_, span := b.tracer.Start(ctx, tracePrefix+".Read") | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	defer span.End() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// TODO: validate key ?
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-08-20 20:29:06 +08:00
										 |  |  | 	readReq := &sqlResourceReadRequest{ | 
					
						
							| 
									
										
										
										
											2024-11-12 19:52:04 +08:00
										 |  |  | 		SQLTemplate: sqltemplate.New(b.dialect), | 
					
						
							|  |  |  | 		Request:     req, | 
					
						
							|  |  |  | 		Response:    NewReadResponse(), | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	sr := sqlResourceRead | 
					
						
							|  |  |  | 	if req.ResourceVersion > 0 { | 
					
						
							|  |  |  | 		// read a specific version
 | 
					
						
							|  |  |  | 		sr = sqlResourceHistoryRead | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-12 19:52:04 +08:00
										 |  |  | 	var res *resource.BackendReadResponse | 
					
						
							| 
									
										
										
										
											2024-08-20 20:29:06 +08:00
										 |  |  | 	err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { | 
					
						
							|  |  |  | 		var err error | 
					
						
							|  |  |  | 		res, err = dbutil.QueryRow(ctx, tx, sr, readReq) | 
					
						
							| 
									
										
										
										
											2024-12-14 06:55:43 +08:00
										 |  |  | 		// if not found, look for latest deleted version (if requested)
 | 
					
						
							|  |  |  | 		if errors.Is(err, sql.ErrNoRows) && req.IncludeDeleted { | 
					
						
							|  |  |  | 			sr = sqlResourceHistoryRead | 
					
						
							|  |  |  | 			readReq2 := &sqlResourceReadRequest{ | 
					
						
							|  |  |  | 				SQLTemplate: sqltemplate.New(b.dialect), | 
					
						
							|  |  |  | 				Request:     req, | 
					
						
							|  |  |  | 				Response:    NewReadResponse(), | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			res, err = dbutil.QueryRow(ctx, tx, sr, readReq2) | 
					
						
							|  |  |  | 			return err | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-08-20 20:29:06 +08:00
										 |  |  | 		return err | 
					
						
							|  |  |  | 	}) | 
					
						
							| 
									
										
										
										
											2024-12-14 06:55:43 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	if errors.Is(err, sql.ErrNoRows) { | 
					
						
							| 
									
										
										
										
											2024-11-12 19:52:04 +08:00
										 |  |  | 		return &resource.BackendReadResponse{ | 
					
						
							| 
									
										
										
										
											2024-07-30 18:16:16 +08:00
										 |  |  | 			Error: resource.NewNotFoundError(req.Key), | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	} else if err != nil { | 
					
						
							| 
									
										
										
										
											2024-11-12 19:52:04 +08:00
										 |  |  | 		return &resource.BackendReadResponse{Error: resource.AsErrorResult(err)} | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-12 19:52:04 +08:00
										 |  |  | 	return res | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | func (b *backend) ListIterator(ctx context.Context, req *resource.ListRequest, cb func(resource.ListIterator) error) (int64, error) { | 
					
						
							| 
									
										
										
										
											2025-01-17 20:54:25 +08:00
										 |  |  | 	ctx, span := b.tracer.Start(ctx, tracePrefix+"List") | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	defer span.End() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-22 21:07:12 +08:00
										 |  |  | 	if req.Options == nil || req.Options.Key.Group == "" || req.Options.Key.Resource == "" { | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 		return 0, fmt.Errorf("missing group or resource") | 
					
						
							| 
									
										
										
										
											2024-07-22 21:07:12 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-01-17 20:54:25 +08:00
										 |  |  | 	if req.Source != resource.ListRequest_STORE { | 
					
						
							|  |  |  | 		return b.getHistory(ctx, req, cb) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	// TODO: think about how to handler VersionMatch. We should be able to use latest for the first page (only).
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 	// TODO: add support for RemainingItemCount
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	if req.ResourceVersion > 0 || req.NextPageToken != "" { | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 		return b.listAtRevision(ctx, req, cb) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 	return b.listLatest(ctx, req, cb) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type listIter struct { | 
					
						
							| 
									
										
										
										
											2024-10-17 19:41:06 +08:00
										 |  |  | 	rows   db.Rows | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 	offset int64 | 
					
						
							|  |  |  | 	listRV int64 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// any error
 | 
					
						
							|  |  |  | 	err error | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// The row
 | 
					
						
							|  |  |  | 	rv        int64 | 
					
						
							|  |  |  | 	value     []byte | 
					
						
							|  |  |  | 	namespace string | 
					
						
							|  |  |  | 	name      string | 
					
						
							| 
									
										
										
										
											2024-11-12 19:52:04 +08:00
										 |  |  | 	folder    string | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // ContinueToken implements resource.ListIterator.
 | 
					
						
							|  |  |  | func (l *listIter) ContinueToken() string { | 
					
						
							|  |  |  | 	return ContinueToken{ResourceVersion: l.listRV, StartOffset: l.offset}.String() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (l *listIter) Error() error { | 
					
						
							|  |  |  | 	return l.err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (l *listIter) Name() string { | 
					
						
							|  |  |  | 	return l.name | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (l *listIter) Namespace() string { | 
					
						
							|  |  |  | 	return l.namespace | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-12 19:52:04 +08:00
										 |  |  | func (l *listIter) Folder() string { | 
					
						
							|  |  |  | 	return l.folder | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | // ResourceVersion implements resource.ListIterator.
 | 
					
						
							|  |  |  | func (l *listIter) ResourceVersion() int64 { | 
					
						
							|  |  |  | 	return l.rv | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Value implements resource.ListIterator.
 | 
					
						
							|  |  |  | func (l *listIter) Value() []byte { | 
					
						
							|  |  |  | 	return l.value | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Next implements resource.ListIterator.
 | 
					
						
							|  |  |  | func (l *listIter) Next() bool { | 
					
						
							|  |  |  | 	if l.rows.Next() { | 
					
						
							|  |  |  | 		l.offset++ | 
					
						
							| 
									
										
										
										
											2024-11-12 19:52:04 +08:00
										 |  |  | 		l.err = l.rows.Scan(&l.rv, &l.namespace, &l.name, &l.folder, &l.value) | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 		return true | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return false | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | var _ resource.ListIterator = (*listIter)(nil) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | // listLatest fetches the resources from the resource table.
 | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | func (b *backend) listLatest(ctx context.Context, req *resource.ListRequest, cb func(resource.ListIterator) error) (int64, error) { | 
					
						
							|  |  |  | 	if req.NextPageToken != "" { | 
					
						
							|  |  |  | 		return 0, fmt.Errorf("only works for the first page") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if req.ResourceVersion > 0 { | 
					
						
							|  |  |  | 		return 0, fmt.Errorf("only works for the 'latest' resource version") | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 	iter := &listIter{} | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 	err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		var err error | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 		iter.listRV, err = fetchLatestRV(ctx, tx, b.dialect, req.Options.Key.Group, req.Options.Key.Resource) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		listReq := sqlResourceListRequest{ | 
					
						
							| 
									
										
										
										
											2024-08-16 20:12:37 +08:00
										 |  |  | 			SQLTemplate: sqltemplate.New(b.dialect), | 
					
						
							|  |  |  | 			Request:     new(resource.ListRequest), | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 		listReq.Request = proto.Clone(req).(*resource.ListRequest) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 		rows, err := dbutil.QueryRows(ctx, tx, sqlResourceList, listReq) | 
					
						
							|  |  |  | 		if rows != nil { | 
					
						
							|  |  |  | 			defer func() { | 
					
						
							|  |  |  | 				if err := rows.Close(); err != nil { | 
					
						
							|  |  |  | 					b.log.Warn("listLatest error closing rows", "error", err) | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			}() | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return err | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 		iter.rows = rows | 
					
						
							|  |  |  | 		return cb(iter) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	}) | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 	return iter.listRV, err | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // listAtRevision fetches the resources from the resource_history table at a specific revision.
 | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | func (b *backend) listAtRevision(ctx context.Context, req *resource.ListRequest, cb func(resource.ListIterator) error) (int64, error) { | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	// Get the RV
 | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 	iter := &listIter{listRV: req.ResourceVersion} | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	if req.NextPageToken != "" { | 
					
						
							|  |  |  | 		continueToken, err := GetContinueToken(req.NextPageToken) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 			return 0, fmt.Errorf("get continue token: %w", err) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 		iter.listRV = continueToken.ResourceVersion | 
					
						
							|  |  |  | 		iter.offset = continueToken.StartOffset | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 		if req.ResourceVersion != 0 && req.ResourceVersion != iter.listRV { | 
					
						
							|  |  |  | 			return 0, apierrors.NewBadRequest("request resource version does not math token") | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if iter.listRV < 1 { | 
					
						
							|  |  |  | 		return 0, apierrors.NewBadRequest("expecting an explicit resource version query") | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 	err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 		limit := int64(0) // ignore limit
 | 
					
						
							|  |  |  | 		if iter.offset > 0 { | 
					
						
							|  |  |  | 			limit = math.MaxInt64 // a limit is required for offset
 | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		listReq := sqlResourceHistoryListRequest{ | 
					
						
							| 
									
										
										
										
											2024-08-16 20:12:37 +08:00
										 |  |  | 			SQLTemplate: sqltemplate.New(b.dialect), | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 			Request: &historyListRequest{ | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 				ResourceVersion: iter.listRV, | 
					
						
							|  |  |  | 				Limit:           limit, | 
					
						
							|  |  |  | 				Offset:          iter.offset, | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 				Options:         req.Options, | 
					
						
							|  |  |  | 			}, | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 		rows, err := dbutil.QueryRows(ctx, tx, sqlResourceHistoryList, listReq) | 
					
						
							|  |  |  | 		if rows != nil { | 
					
						
							|  |  |  | 			defer func() { | 
					
						
							|  |  |  | 				if err := rows.Close(); err != nil { | 
					
						
							|  |  |  | 					b.log.Warn("listAtRevision error closing rows", "error", err) | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			}() | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return err | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 		iter.rows = rows | 
					
						
							|  |  |  | 		return cb(iter) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	}) | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 	return iter.listRV, err | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-01-17 20:54:25 +08:00
										 |  |  | // listLatest fetches the resources from the resource table.
 | 
					
						
							|  |  |  | func (b *backend) getHistory(ctx context.Context, req *resource.ListRequest, cb func(resource.ListIterator) error) (int64, error) { | 
					
						
							|  |  |  | 	listReq := sqlGetHistoryRequest{ | 
					
						
							|  |  |  | 		SQLTemplate: sqltemplate.New(b.dialect), | 
					
						
							|  |  |  | 		Key:         req.Options.Key, | 
					
						
							|  |  |  | 		Trash:       req.Source == resource.ListRequest_TRASH, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	iter := &listIter{} | 
					
						
							|  |  |  | 	if req.NextPageToken != "" { | 
					
						
							|  |  |  | 		continueToken, err := GetContinueToken(req.NextPageToken) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return 0, fmt.Errorf("get continue token: %w", err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		listReq.StartRV = continueToken.ResourceVersion | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { | 
					
						
							|  |  |  | 		var err error | 
					
						
							|  |  |  | 		iter.listRV, err = fetchLatestRV(ctx, tx, b.dialect, req.Options.Key.Group, req.Options.Key.Resource) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		rows, err := dbutil.QueryRows(ctx, tx, sqlResourceHistoryGet, listReq) | 
					
						
							|  |  |  | 		if rows != nil { | 
					
						
							|  |  |  | 			defer func() { | 
					
						
							|  |  |  | 				if err := rows.Close(); err != nil { | 
					
						
							|  |  |  | 					b.log.Warn("listLatest error closing rows", "error", err) | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			}() | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		iter.rows = rows | 
					
						
							|  |  |  | 		return cb(iter) | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | 	return iter.listRV, err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | func (b *backend) WatchWriteEvents(ctx context.Context) (<-chan *resource.WrittenEvent, error) { | 
					
						
							| 
									
										
										
										
											2024-07-22 21:07:12 +08:00
										 |  |  | 	// Get the latest RV
 | 
					
						
							|  |  |  | 	since, err := b.listLatestRVs(ctx) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2024-07-22 21:07:12 +08:00
										 |  |  | 		return nil, fmt.Errorf("get the latest resource version: %w", err) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	// Start the poller
 | 
					
						
							|  |  |  | 	stream := make(chan *resource.WrittenEvent) | 
					
						
							|  |  |  | 	go b.poller(ctx, since, stream) | 
					
						
							|  |  |  | 	return stream, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-22 21:07:12 +08:00
										 |  |  | func (b *backend) poller(ctx context.Context, since groupResourceRV, stream chan<- *resource.WrittenEvent) { | 
					
						
							| 
									
										
										
										
											2024-10-07 16:01:53 +08:00
										 |  |  | 	t := time.NewTicker(b.pollingInterval) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	defer close(stream) | 
					
						
							|  |  |  | 	defer t.Stop() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for { | 
					
						
							|  |  |  | 		select { | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 		case <-b.done: | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 			return | 
					
						
							|  |  |  | 		case <-t.C: | 
					
						
							| 
									
										
										
										
											2024-10-25 03:49:38 +08:00
										 |  |  | 			ctx, span := b.tracer.Start(ctx, tracePrefix+"poller") | 
					
						
							| 
									
										
										
										
											2024-07-22 21:07:12 +08:00
										 |  |  | 			// List the latest RVs
 | 
					
						
							|  |  |  | 			grv, err := b.listLatestRVs(ctx) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 			if err != nil { | 
					
						
							| 
									
										
										
										
											2024-07-22 21:07:12 +08:00
										 |  |  | 				b.log.Error("get the latest resource version", "err", err) | 
					
						
							| 
									
										
										
										
											2024-10-07 16:01:53 +08:00
										 |  |  | 				t.Reset(b.pollingInterval) | 
					
						
							| 
									
										
										
										
											2024-07-22 21:07:12 +08:00
										 |  |  | 				continue | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			for group, items := range grv { | 
					
						
							|  |  |  | 				for resource := range items { | 
					
						
							|  |  |  | 					// If we haven't seen this resource before, we start from 0
 | 
					
						
							|  |  |  | 					if _, ok := since[group]; !ok { | 
					
						
							|  |  |  | 						since[group] = make(map[string]int64) | 
					
						
							|  |  |  | 					} | 
					
						
							|  |  |  | 					if _, ok := since[group][resource]; !ok { | 
					
						
							|  |  |  | 						since[group][resource] = 0 | 
					
						
							|  |  |  | 					} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 					// Poll for new events
 | 
					
						
							|  |  |  | 					next, err := b.poll(ctx, group, resource, since[group][resource], stream) | 
					
						
							|  |  |  | 					if err != nil { | 
					
						
							|  |  |  | 						b.log.Error("polling for resource", "err", err) | 
					
						
							| 
									
										
										
										
											2024-10-07 16:01:53 +08:00
										 |  |  | 						t.Reset(b.pollingInterval) | 
					
						
							| 
									
										
										
										
											2024-07-22 21:07:12 +08:00
										 |  |  | 						continue | 
					
						
							|  |  |  | 					} | 
					
						
							| 
									
										
										
										
											2024-09-20 21:51:09 +08:00
										 |  |  | 					if next > since[group][resource] { | 
					
						
							|  |  |  | 						since[group][resource] = next | 
					
						
							|  |  |  | 					} | 
					
						
							| 
									
										
										
										
											2024-07-22 21:07:12 +08:00
										 |  |  | 				} | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 			} | 
					
						
							| 
									
										
										
										
											2024-07-22 21:07:12 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-10-07 16:01:53 +08:00
										 |  |  | 			t.Reset(b.pollingInterval) | 
					
						
							| 
									
										
										
										
											2024-10-25 03:49:38 +08:00
										 |  |  | 			span.End() | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-22 21:07:12 +08:00
										 |  |  | // listLatestRVs returns the latest resource version for each (Group, Resource) pair.
 | 
					
						
							|  |  |  | func (b *backend) listLatestRVs(ctx context.Context) (groupResourceRV, error) { | 
					
						
							| 
									
										
										
										
											2024-10-25 03:49:38 +08:00
										 |  |  | 	ctx, span := b.tracer.Start(ctx, tracePrefix+"listLatestRVs") | 
					
						
							|  |  |  | 	defer span.End() | 
					
						
							| 
									
										
										
										
											2024-08-20 20:29:06 +08:00
										 |  |  | 	var grvs []*groupResourceVersion | 
					
						
							|  |  |  | 	err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { | 
					
						
							|  |  |  | 		var err error | 
					
						
							|  |  |  | 		grvs, err = dbutil.Query(ctx, tx, sqlResourceVersionList, &sqlResourceVersionListRequest{ | 
					
						
							|  |  |  | 			SQLTemplate:          sqltemplate.New(b.dialect), | 
					
						
							|  |  |  | 			groupResourceVersion: new(groupResourceVersion), | 
					
						
							|  |  |  | 		}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	}) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2024-08-20 20:29:06 +08:00
										 |  |  | 		return nil, err | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-07-22 21:07:12 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-08-20 20:29:06 +08:00
										 |  |  | 	since := groupResourceRV{} | 
					
						
							|  |  |  | 	for _, grv := range grvs { | 
					
						
							|  |  |  | 		if since[grv.Group] == nil { | 
					
						
							|  |  |  | 			since[grv.Group] = map[string]int64{} | 
					
						
							| 
									
										
										
										
											2024-07-22 21:07:12 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-08-20 20:29:06 +08:00
										 |  |  | 		since[grv.Group][grv.Resource] = grv.ResourceVersion | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-08-20 20:29:06 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-22 21:07:12 +08:00
										 |  |  | 	return since, nil | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-22 21:07:12 +08:00
										 |  |  | // fetchLatestRV returns the current maximum RV in the resource table
 | 
					
						
							|  |  |  | func fetchLatestRV(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialect, group, resource string) (int64, error) { | 
					
						
							| 
									
										
										
										
											2024-10-11 17:11:33 +08:00
										 |  |  | 	res, err := dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionGetRequest{ | 
					
						
							|  |  |  | 		SQLTemplate: sqltemplate.New(d), | 
					
						
							|  |  |  | 		Group:       group, | 
					
						
							|  |  |  | 		Resource:    resource, | 
					
						
							|  |  |  | 		ReadOnly:    true, | 
					
						
							|  |  |  | 		Response:    new(resourceVersionResponse), | 
					
						
							| 
									
										
										
										
											2024-07-22 21:07:12 +08:00
										 |  |  | 	}) | 
					
						
							|  |  |  | 	if errors.Is(err, sql.ErrNoRows) { | 
					
						
							| 
									
										
										
										
											2024-07-26 00:17:39 +08:00
										 |  |  | 		return 1, nil | 
					
						
							| 
									
										
										
										
											2024-07-22 21:07:12 +08:00
										 |  |  | 	} else if err != nil { | 
					
						
							|  |  |  | 		return 0, fmt.Errorf("get resource version: %w", err) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return res.ResourceVersion, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (b *backend) poll(ctx context.Context, grp string, res string, since int64, stream chan<- *resource.WrittenEvent) (int64, error) { | 
					
						
							| 
									
										
										
										
											2024-10-10 04:32:09 +08:00
										 |  |  | 	ctx, span := b.tracer.Start(ctx, tracePrefix+"poll") | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	defer span.End() | 
					
						
							| 
									
										
										
										
											2024-08-20 20:29:06 +08:00
										 |  |  | 	var records []*historyPollResponse | 
					
						
							|  |  |  | 	err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { | 
					
						
							|  |  |  | 		var err error | 
					
						
							|  |  |  | 		records, err = dbutil.Query(ctx, tx, sqlResourceHistoryPoll, &sqlResourceHistoryPollRequest{ | 
					
						
							|  |  |  | 			SQLTemplate:          sqltemplate.New(b.dialect), | 
					
						
							|  |  |  | 			Resource:             res, | 
					
						
							|  |  |  | 			Group:                grp, | 
					
						
							|  |  |  | 			SinceResourceVersion: since, | 
					
						
							|  |  |  | 			Response:             &historyPollResponse{}, | 
					
						
							|  |  |  | 		}) | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	}) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2024-08-20 20:29:06 +08:00
										 |  |  | 		return 0, fmt.Errorf("poll history: %w", err) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-07-22 21:07:12 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-08-20 20:29:06 +08:00
										 |  |  | 	var nextRV int64 | 
					
						
							|  |  |  | 	for _, rec := range records { | 
					
						
							|  |  |  | 		if rec.Key.Group == "" || rec.Key.Resource == "" || rec.Key.Name == "" { | 
					
						
							| 
									
										
										
										
											2024-07-22 21:07:12 +08:00
										 |  |  | 			return nextRV, fmt.Errorf("missing key in response") | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-08-20 20:29:06 +08:00
										 |  |  | 		nextRV = rec.ResourceVersion | 
					
						
							| 
									
										
										
										
											2024-10-07 16:01:53 +08:00
										 |  |  | 		prevRV := rec.PreviousRV | 
					
						
							|  |  |  | 		if prevRV == nil { | 
					
						
							| 
									
										
										
										
											2024-11-28 21:28:55 +08:00
										 |  |  | 			prevRV = new(int64) | 
					
						
							| 
									
										
										
										
											2024-10-07 16:01:53 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		stream <- &resource.WrittenEvent{ | 
					
						
							|  |  |  | 			WriteEvent: resource.WriteEvent{ | 
					
						
							| 
									
										
										
										
											2024-08-20 20:29:06 +08:00
										 |  |  | 				Value: rec.Value, | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 				Key: &resource.ResourceKey{ | 
					
						
							| 
									
										
										
										
											2024-08-20 20:29:06 +08:00
										 |  |  | 					Namespace: rec.Key.Namespace, | 
					
						
							|  |  |  | 					Group:     rec.Key.Group, | 
					
						
							|  |  |  | 					Resource:  rec.Key.Resource, | 
					
						
							|  |  |  | 					Name:      rec.Key.Name, | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 				}, | 
					
						
							| 
									
										
										
										
											2024-10-07 16:01:53 +08:00
										 |  |  | 				Type:       resource.WatchEvent_Type(rec.Action), | 
					
						
							|  |  |  | 				PreviousRV: *prevRV, | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 			}, | 
					
						
							| 
									
										
										
										
											2024-11-12 19:52:04 +08:00
										 |  |  | 			Folder:          rec.Folder, | 
					
						
							| 
									
										
										
										
											2024-08-20 20:29:06 +08:00
										 |  |  | 			ResourceVersion: rec.ResourceVersion, | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 			// Timestamp:  , // TODO: add timestamp
 | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-08-20 20:29:06 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-22 21:07:12 +08:00
										 |  |  | 	return nextRV, nil | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-10-11 17:11:33 +08:00
										 |  |  | // resourceVersionAtomicInc atomically increases the version of a kind within a transaction.
 | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | // TODO: Ideally we should attempt to update the RV in the resource and resource_history tables
 | 
					
						
							|  |  |  | // in a single roundtrip. This would reduce the latency of the operation, and also increase the
 | 
					
						
							|  |  |  | // throughput of the system. This is a good candidate for a future optimization.
 | 
					
						
							| 
									
										
										
										
											2024-10-18 11:32:08 +08:00
										 |  |  | func (b *backend) resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, key *resource.ResourceKey) (newVersion int64, err error) { | 
					
						
							| 
									
										
										
										
											2024-10-24 23:12:20 +08:00
										 |  |  | 	ctx, span := b.tracer.Start(ctx, tracePrefix+"version_atomic_inc", trace.WithAttributes( | 
					
						
							| 
									
										
										
										
											2024-10-18 11:32:08 +08:00
										 |  |  | 		semconv.K8SNamespaceName(key.Namespace), | 
					
						
							|  |  |  | 		// TODO: the following attributes could use some standardization.
 | 
					
						
							|  |  |  | 		attribute.String("k8s.resource.group", key.Group), | 
					
						
							|  |  |  | 		attribute.String("k8s.resource.type", key.Resource), | 
					
						
							|  |  |  | 	)) | 
					
						
							|  |  |  | 	defer span.End() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-10-11 17:11:33 +08:00
										 |  |  | 	// 1. Lock to row and prevent concurrent updates until the transaction is committed.
 | 
					
						
							|  |  |  | 	res, err := dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionGetRequest{ | 
					
						
							| 
									
										
										
										
											2024-10-18 11:32:08 +08:00
										 |  |  | 		SQLTemplate: sqltemplate.New(b.dialect), | 
					
						
							| 
									
										
										
										
											2024-10-11 17:11:33 +08:00
										 |  |  | 		Group:       key.Group, | 
					
						
							|  |  |  | 		Resource:    key.Resource, | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		Response: new(resourceVersionResponse), ReadOnly: false, // This locks the row for update
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 	}) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	if errors.Is(err, sql.ErrNoRows) { | 
					
						
							| 
									
										
										
										
											2024-10-11 17:11:33 +08:00
										 |  |  | 		// if there wasn't a row associated with the given resource, then we create it.
 | 
					
						
							|  |  |  | 		if _, err = dbutil.Exec(ctx, x, sqlResourceVersionInsert, sqlResourceVersionUpsertRequest{ | 
					
						
							| 
									
										
										
										
											2024-10-18 11:32:08 +08:00
										 |  |  | 			SQLTemplate: sqltemplate.New(b.dialect), | 
					
						
							| 
									
										
										
										
											2024-10-11 17:11:33 +08:00
										 |  |  | 			Group:       key.Group, | 
					
						
							|  |  |  | 			Resource:    key.Resource, | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 		}); err != nil { | 
					
						
							|  |  |  | 			return 0, fmt.Errorf("insert into resource_version: %w", err) | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-10-11 17:11:33 +08:00
										 |  |  | 		res, err = dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionGetRequest{ | 
					
						
							| 
									
										
										
										
											2024-10-18 11:32:08 +08:00
										 |  |  | 			SQLTemplate: sqltemplate.New(b.dialect), | 
					
						
							| 
									
										
										
										
											2024-10-11 17:11:33 +08:00
										 |  |  | 			Group:       key.Group, | 
					
						
							|  |  |  | 			Resource:    key.Resource, | 
					
						
							|  |  |  | 			Response:    new(resourceVersionResponse), | 
					
						
							|  |  |  | 			ReadOnly:    true, // This locks the row for update
 | 
					
						
							|  |  |  | 		}) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return 0, fmt.Errorf("fetching RV after read") | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		return res.ResourceVersion, nil | 
					
						
							|  |  |  | 	} else if err != nil { | 
					
						
							|  |  |  | 		return 0, fmt.Errorf("lock the resource version: %w", err) | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-10-11 17:11:33 +08:00
										 |  |  | 	// 2. Update the RV
 | 
					
						
							|  |  |  | 	// Most times, the RV is the current microsecond timestamp generated on the sql server (to avoid clock skew).
 | 
					
						
							|  |  |  | 	// In rare occasion, the server clock might go back in time. In those cases, we simply increment the
 | 
					
						
							|  |  |  | 	// previous RV until the clock catches up.
 | 
					
						
							|  |  |  | 	nextRV := max(res.CurrentEpoch, res.ResourceVersion+1) | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-10-11 17:11:33 +08:00
										 |  |  | 	_, err = dbutil.Exec(ctx, x, sqlResourceVersionUpdate, sqlResourceVersionUpsertRequest{ | 
					
						
							| 
									
										
										
										
											2024-10-18 11:32:08 +08:00
										 |  |  | 		SQLTemplate:     sqltemplate.New(b.dialect), | 
					
						
							| 
									
										
										
										
											2024-10-11 17:11:33 +08:00
										 |  |  | 		Group:           key.Group, | 
					
						
							|  |  |  | 		Resource:        key.Resource, | 
					
						
							|  |  |  | 		ResourceVersion: nextRV, | 
					
						
							| 
									
										
										
										
											2024-07-18 23:03:18 +08:00
										 |  |  | 	}) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return 0, fmt.Errorf("increase resource version: %w", err) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return nextRV, nil | 
					
						
							|  |  |  | } |