2024-07-18 23:03:18 +08:00
package sql
import (
"context"
"database/sql"
"errors"
"fmt"
2024-07-31 17:05:59 +08:00
"math"
2024-07-23 01:08:30 +08:00
"sync"
2024-07-18 23:03:18 +08:00
"time"
"github.com/google/uuid"
2024-10-18 11:32:08 +08:00
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
2024-10-10 04:32:09 +08:00
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"google.golang.org/protobuf/proto"
apierrors "k8s.io/apimachinery/pkg/api/errors"
2024-07-18 23:03:18 +08:00
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
2024-07-23 01:08:30 +08:00
"github.com/grafana/grafana/pkg/storage/unified/sql/dbutil"
2024-07-18 23:03:18 +08:00
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
)
2024-10-10 04:32:09 +08:00
const tracePrefix = "sql.resource."
2024-10-07 16:01:53 +08:00
const defaultPollingInterval = 100 * time . Millisecond
2024-07-18 23:03:18 +08:00
2024-07-23 01:08:30 +08:00
type Backend interface {
resource . StorageBackend
resource . DiagnosticsServer
resource . LifecycleHooks
2024-07-18 23:03:18 +08:00
}
2024-07-23 01:08:30 +08:00
type BackendOptions struct {
2025-01-23 18:34:48 +08:00
DBProvider db . DBProvider
Tracer trace . Tracer
PollingInterval time . Duration
2024-07-23 01:08:30 +08:00
}
2024-07-18 23:03:18 +08:00
2024-07-23 01:08:30 +08:00
func NewBackend ( opts BackendOptions ) ( Backend , error ) {
if opts . DBProvider == nil {
return nil , errors . New ( "no db provider" )
}
2024-07-18 23:03:18 +08:00
if opts . Tracer == nil {
opts . Tracer = noop . NewTracerProvider ( ) . Tracer ( "sql-backend" )
}
2024-07-23 01:08:30 +08:00
ctx , cancel := context . WithCancel ( context . Background ( ) )
2024-07-18 23:03:18 +08:00
2024-10-07 16:01:53 +08:00
pollingInterval := opts . PollingInterval
if pollingInterval == 0 {
pollingInterval = defaultPollingInterval
}
2024-07-18 23:03:18 +08:00
return & backend {
2025-01-23 18:34:48 +08:00
done : ctx . Done ( ) ,
cancel : cancel ,
log : log . New ( "sql-resource-server" ) ,
tracer : opts . Tracer ,
dbProvider : opts . DBProvider ,
pollingInterval : pollingInterval ,
2025-02-12 01:57:46 +08:00
batchLock : & batchLock { running : make ( map [ string ] bool ) } ,
2024-07-18 23:03:18 +08:00
} , nil
}
type backend struct {
2024-07-23 01:08:30 +08:00
// server lifecycle
done <- chan struct { }
cancel context . CancelFunc
initOnce sync . Once
initErr error
// o11y
log log . Logger
tracer trace . Tracer
// database
2025-01-23 18:34:48 +08:00
dbProvider db . DBProvider
db db . DB
dialect sqltemplate . Dialect
2025-02-12 01:57:46 +08:00
batchLock * batchLock
2024-07-23 01:08:30 +08:00
// watch streaming
2024-07-18 23:03:18 +08:00
//stream chan *resource.WatchEvent
2024-10-07 16:01:53 +08:00
pollingInterval time . Duration
2024-07-18 23:03:18 +08:00
}
2024-07-23 01:08:30 +08:00
func ( b * backend ) Init ( ctx context . Context ) error {
b . initOnce . Do ( func ( ) {
b . initErr = b . initLocked ( ctx )
} )
return b . initErr
}
2024-07-18 23:03:18 +08:00
2024-07-23 01:08:30 +08:00
func ( b * backend ) initLocked ( ctx context . Context ) error {
db , err := b . dbProvider . Init ( ctx )
2024-07-18 23:03:18 +08:00
if err != nil {
2024-07-23 01:08:30 +08:00
return fmt . Errorf ( "initialize resource DB: %w" , err )
2024-07-18 23:03:18 +08:00
}
2024-07-23 01:08:30 +08:00
b . db = db
2024-07-18 23:03:18 +08:00
2024-07-23 01:08:30 +08:00
driverName := db . DriverName ( )
b . dialect = sqltemplate . DialectForDriver ( driverName )
if b . dialect == nil {
2024-07-18 23:03:18 +08:00
return fmt . Errorf ( "no dialect for driver %q" , driverName )
}
2024-07-23 01:08:30 +08:00
return b . db . PingContext ( ctx )
2024-07-18 23:03:18 +08:00
}
func ( b * backend ) IsHealthy ( ctx context . Context , r * resource . HealthCheckRequest ) ( * resource . HealthCheckResponse , error ) {
// ctxLogger := s.log.FromContext(log.WithContextualAttributes(ctx, []any{"method", "isHealthy"}))
2024-07-23 01:08:30 +08:00
if err := b . db . PingContext ( ctx ) ; err != nil {
2024-07-18 23:03:18 +08:00
return nil , err
}
2024-07-23 01:08:30 +08:00
2024-07-18 23:03:18 +08:00
return & resource . HealthCheckResponse { Status : resource . HealthCheckResponse_SERVING } , nil
}
2024-07-23 01:08:30 +08:00
func ( b * backend ) Stop ( _ context . Context ) error {
2024-07-18 23:03:18 +08:00
b . cancel ( )
2024-07-23 01:08:30 +08:00
return nil
2024-07-18 23:03:18 +08:00
}
2024-12-04 01:20:27 +08:00
// GetResourceStats implements Backend.
2024-12-05 18:58:13 +08:00
func ( b * backend ) GetResourceStats ( ctx context . Context , namespace string , minCount int ) ( [ ] resource . ResourceStats , error ) {
2024-12-10 12:32:19 +08:00
ctx , span := b . tracer . Start ( ctx , tracePrefix + ".GetResourceStats" )
2024-12-04 01:20:27 +08:00
defer span . End ( )
req := & sqlStatsRequest {
SQLTemplate : sqltemplate . New ( b . dialect ) ,
2024-12-05 18:58:13 +08:00
Namespace : namespace ,
2024-12-04 01:20:27 +08:00
MinCount : minCount , // not used in query... yet?
}
res := make ( [ ] resource . ResourceStats , 0 , 100 )
err := b . db . WithTx ( ctx , ReadCommittedRO , func ( ctx context . Context , tx db . Tx ) error {
rows , err := dbutil . QueryRows ( ctx , tx , sqlResourceStats , req )
if err != nil {
return err
}
for rows . Next ( ) {
row := resource . ResourceStats { }
err = rows . Scan ( & row . Namespace , & row . Group , & row . Resource , & row . Count , & row . ResourceVersion )
if err != nil {
return err
}
if row . Count > int64 ( minCount ) {
res = append ( res , row )
}
}
return err
} )
return res , err
}
2024-07-18 23:03:18 +08:00
func ( b * backend ) WriteEvent ( ctx context . Context , event resource . WriteEvent ) ( int64 , error ) {
2024-10-10 04:32:09 +08:00
_ , span := b . tracer . Start ( ctx , tracePrefix + "WriteEvent" )
2024-07-18 23:03:18 +08:00
defer span . End ( )
// TODO: validate key ?
switch event . Type {
case resource . WatchEvent_ADDED :
2024-12-14 06:55:43 +08:00
if event . ObjectOld != nil {
return b . restore ( ctx , event )
}
2024-07-18 23:03:18 +08:00
return b . create ( ctx , event )
case resource . WatchEvent_MODIFIED :
return b . update ( ctx , event )
case resource . WatchEvent_DELETED :
return b . delete ( ctx , event )
default :
return 0 , fmt . Errorf ( "unsupported event type" )
}
}
func ( b * backend ) create ( ctx context . Context , event resource . WriteEvent ) ( int64 , error ) {
2024-10-10 04:32:09 +08:00
ctx , span := b . tracer . Start ( ctx , tracePrefix + "Create" )
2024-07-18 23:03:18 +08:00
defer span . End ( )
var newVersion int64
guid := uuid . New ( ) . String ( )
2024-07-23 01:08:30 +08:00
err := b . db . WithTx ( ctx , ReadCommitted , func ( ctx context . Context , tx db . Tx ) error {
2024-11-12 19:52:04 +08:00
folder := ""
if event . Object != nil {
folder = event . Object . GetFolder ( )
}
2024-07-18 23:03:18 +08:00
// 1. Insert into resource
2024-07-23 01:08:30 +08:00
if _ , err := dbutil . Exec ( ctx , tx , sqlResourceInsert , sqlResourceRequest {
2024-08-16 20:12:37 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
WriteEvent : event ,
2024-11-12 19:52:04 +08:00
Folder : folder ,
2024-08-16 20:12:37 +08:00
GUID : guid ,
2024-07-18 23:03:18 +08:00
} ) ; err != nil {
return fmt . Errorf ( "insert into resource: %w" , err )
}
// 2. Insert into resource history
2024-07-23 01:08:30 +08:00
if _ , err := dbutil . Exec ( ctx , tx , sqlResourceHistoryInsert , sqlResourceRequest {
2024-08-16 20:12:37 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
WriteEvent : event ,
2024-11-12 19:52:04 +08:00
Folder : folder ,
2024-08-16 20:12:37 +08:00
GUID : guid ,
2024-07-18 23:03:18 +08:00
} ) ; err != nil {
return fmt . Errorf ( "insert into resource history: %w" , err )
}
// 3. TODO: Rebuild the whole folder tree structure if we're creating a folder
2024-07-23 01:08:30 +08:00
// 4. Atomically increment resource version for this kind
2024-10-18 11:32:08 +08:00
rv , err := b . resourceVersionAtomicInc ( ctx , tx , event . Key )
2024-07-18 23:03:18 +08:00
if err != nil {
2024-07-23 01:08:30 +08:00
return fmt . Errorf ( "increment resource version: %w" , err )
2024-07-18 23:03:18 +08:00
}
// 5. Update the RV in both resource and resource_history
2024-07-23 01:08:30 +08:00
if _ , err = dbutil . Exec ( ctx , tx , sqlResourceHistoryUpdateRV , sqlResourceUpdateRVRequest {
2024-08-16 20:12:37 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
GUID : guid ,
ResourceVersion : rv ,
2024-07-18 23:03:18 +08:00
} ) ; err != nil {
2024-07-23 01:08:30 +08:00
return fmt . Errorf ( "update resource_history rv: %w" , err )
2024-07-18 23:03:18 +08:00
}
2024-07-23 01:08:30 +08:00
if _ , err = dbutil . Exec ( ctx , tx , sqlResourceUpdateRV , sqlResourceUpdateRVRequest {
2024-08-16 20:12:37 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
GUID : guid ,
ResourceVersion : rv ,
2024-07-18 23:03:18 +08:00
} ) ; err != nil {
return fmt . Errorf ( "update resource rv: %w" , err )
}
2024-07-23 01:08:30 +08:00
newVersion = rv
2024-07-18 23:03:18 +08:00
return nil
} )
return newVersion , err
}
func ( b * backend ) update ( ctx context . Context , event resource . WriteEvent ) ( int64 , error ) {
2024-10-10 04:32:09 +08:00
ctx , span := b . tracer . Start ( ctx , tracePrefix + "Update" )
2024-07-18 23:03:18 +08:00
defer span . End ( )
var newVersion int64
guid := uuid . New ( ) . String ( )
2024-07-23 01:08:30 +08:00
err := b . db . WithTx ( ctx , ReadCommitted , func ( ctx context . Context , tx db . Tx ) error {
2024-11-12 19:52:04 +08:00
folder := ""
if event . Object != nil {
folder = event . Object . GetFolder ( )
}
2024-07-23 01:08:30 +08:00
// 1. Update resource
_ , err := dbutil . Exec ( ctx , tx , sqlResourceUpdate , sqlResourceRequest {
2024-08-16 20:12:37 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
WriteEvent : event ,
2024-11-12 19:52:04 +08:00
Folder : folder ,
2024-08-16 20:12:37 +08:00
GUID : guid ,
2024-07-18 23:03:18 +08:00
} )
if err != nil {
2024-07-23 01:08:30 +08:00
return fmt . Errorf ( "initial resource update: %w" , err )
2024-07-18 23:03:18 +08:00
}
// 2. Insert into resource history
2024-07-23 01:08:30 +08:00
if _ , err := dbutil . Exec ( ctx , tx , sqlResourceHistoryInsert , sqlResourceRequest {
2024-08-16 20:12:37 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
WriteEvent : event ,
2024-11-12 19:52:04 +08:00
Folder : folder ,
2024-08-16 20:12:37 +08:00
GUID : guid ,
2024-07-18 23:03:18 +08:00
} ) ; err != nil {
return fmt . Errorf ( "insert into resource history: %w" , err )
}
// 3. TODO: Rebuild the whole folder tree structure if we're creating a folder
2024-07-23 01:08:30 +08:00
// 4. Atomically increment resource version for this kind
2024-10-18 11:32:08 +08:00
rv , err := b . resourceVersionAtomicInc ( ctx , tx , event . Key )
2024-07-18 23:03:18 +08:00
if err != nil {
2024-07-23 01:08:30 +08:00
return fmt . Errorf ( "increment resource version: %w" , err )
2024-07-18 23:03:18 +08:00
}
// 5. Update the RV in both resource and resource_history
2024-07-23 01:08:30 +08:00
if _ , err = dbutil . Exec ( ctx , tx , sqlResourceHistoryUpdateRV , sqlResourceUpdateRVRequest {
2024-08-16 20:12:37 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
GUID : guid ,
ResourceVersion : rv ,
2024-07-18 23:03:18 +08:00
} ) ; err != nil {
return fmt . Errorf ( "update history rv: %w" , err )
}
2024-07-23 01:08:30 +08:00
if _ , err = dbutil . Exec ( ctx , tx , sqlResourceUpdateRV , sqlResourceUpdateRVRequest {
2024-08-16 20:12:37 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
GUID : guid ,
ResourceVersion : rv ,
2024-07-18 23:03:18 +08:00
} ) ; err != nil {
return fmt . Errorf ( "update resource rv: %w" , err )
}
2024-07-23 01:08:30 +08:00
newVersion = rv
2024-07-18 23:03:18 +08:00
return nil
} )
return newVersion , err
}
func ( b * backend ) delete ( ctx context . Context , event resource . WriteEvent ) ( int64 , error ) {
2024-10-10 04:32:09 +08:00
ctx , span := b . tracer . Start ( ctx , tracePrefix + "Delete" )
2024-07-18 23:03:18 +08:00
defer span . End ( )
var newVersion int64
guid := uuid . New ( ) . String ( )
2024-07-23 01:08:30 +08:00
err := b . db . WithTx ( ctx , ReadCommitted , func ( ctx context . Context , tx db . Tx ) error {
2024-11-12 19:52:04 +08:00
folder := ""
if event . Object != nil {
folder = event . Object . GetFolder ( )
}
2024-07-18 23:03:18 +08:00
// 1. delete from resource
2024-07-23 01:08:30 +08:00
_ , err := dbutil . Exec ( ctx , tx , sqlResourceDelete , sqlResourceRequest {
2024-08-16 20:12:37 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
WriteEvent : event ,
GUID : guid ,
2024-07-18 23:03:18 +08:00
} )
if err != nil {
return fmt . Errorf ( "delete resource: %w" , err )
}
2024-07-23 01:08:30 +08:00
// 2. Add event to resource history
if _ , err := dbutil . Exec ( ctx , tx , sqlResourceHistoryInsert , sqlResourceRequest {
2024-08-16 20:12:37 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
WriteEvent : event ,
2024-11-12 19:52:04 +08:00
Folder : folder ,
2024-08-16 20:12:37 +08:00
GUID : guid ,
2024-07-18 23:03:18 +08:00
} ) ; err != nil {
return fmt . Errorf ( "insert into resource history: %w" , err )
}
// 3. TODO: Rebuild the whole folder tree structure if we're creating a folder
2024-07-23 01:08:30 +08:00
// 4. Atomically increment resource version for this kind
2024-10-18 11:32:08 +08:00
rv , err := b . resourceVersionAtomicInc ( ctx , tx , event . Key )
2024-07-18 23:03:18 +08:00
if err != nil {
2024-07-23 01:08:30 +08:00
return fmt . Errorf ( "increment resource version: %w" , err )
2024-07-18 23:03:18 +08:00
}
// 5. Update the RV in resource_history
2024-07-23 01:08:30 +08:00
if _ , err = dbutil . Exec ( ctx , tx , sqlResourceHistoryUpdateRV , sqlResourceUpdateRVRequest {
2024-08-16 20:12:37 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
GUID : guid ,
ResourceVersion : rv ,
2024-07-18 23:03:18 +08:00
} ) ; err != nil {
return fmt . Errorf ( "update history rv: %w" , err )
}
2024-07-23 01:08:30 +08:00
newVersion = rv
2024-07-18 23:03:18 +08:00
return nil
} )
return newVersion , err
}
2024-12-14 06:55:43 +08:00
func ( b * backend ) restore ( ctx context . Context , event resource . WriteEvent ) ( int64 , error ) {
ctx , span := b . tracer . Start ( ctx , tracePrefix + "Restore" )
defer span . End ( )
var newVersion int64
guid := uuid . New ( ) . String ( )
err := b . db . WithTx ( ctx , ReadCommitted , func ( ctx context . Context , tx db . Tx ) error {
folder := ""
if event . Object != nil {
folder = event . Object . GetFolder ( )
}
// 1. Re-create resource
// Note: we may want to replace the write event with a create event, tbd.
if _ , err := dbutil . Exec ( ctx , tx , sqlResourceInsert , sqlResourceRequest {
SQLTemplate : sqltemplate . New ( b . dialect ) ,
WriteEvent : event ,
Folder : folder ,
GUID : guid ,
} ) ; err != nil {
return fmt . Errorf ( "insert into resource: %w" , err )
}
// 2. Insert into resource history
if _ , err := dbutil . Exec ( ctx , tx , sqlResourceHistoryInsert , sqlResourceRequest {
SQLTemplate : sqltemplate . New ( b . dialect ) ,
WriteEvent : event ,
Folder : folder ,
GUID : guid ,
} ) ; err != nil {
return fmt . Errorf ( "insert into resource history: %w" , err )
}
// 3. TODO: Rebuild the whole folder tree structure if we're creating a folder
// 4. Atomically increment resource version for this kind
rv , err := b . resourceVersionAtomicInc ( ctx , tx , event . Key )
if err != nil {
return fmt . Errorf ( "increment resource version: %w" , err )
}
// 5. Update the RV in both resource and resource_history
if _ , err = dbutil . Exec ( ctx , tx , sqlResourceHistoryUpdateRV , sqlResourceUpdateRVRequest {
SQLTemplate : sqltemplate . New ( b . dialect ) ,
GUID : guid ,
ResourceVersion : rv ,
} ) ; err != nil {
return fmt . Errorf ( "update history rv: %w" , err )
}
if _ , err = dbutil . Exec ( ctx , tx , sqlResourceUpdateRV , sqlResourceUpdateRVRequest {
SQLTemplate : sqltemplate . New ( b . dialect ) ,
GUID : guid ,
ResourceVersion : rv ,
} ) ; err != nil {
return fmt . Errorf ( "update resource rv: %w" , err )
}
// 6. Update all resource history entries with the new UID
// Note: we do not update any history entries that have a deletion timestamp included. This will become
// important once we start using finalizers, as the initial delete will show up as an update with a deletion timestamp included.
if _ , err = dbutil . Exec ( ctx , tx , sqlResoureceHistoryUpdateUid , sqlResourceHistoryUpdateRequest {
SQLTemplate : sqltemplate . New ( b . dialect ) ,
WriteEvent : event ,
OldUID : string ( event . ObjectOld . GetUID ( ) ) ,
NewUID : string ( event . Object . GetUID ( ) ) ,
} ) ; err != nil {
return fmt . Errorf ( "update history uid: %w" , err )
}
newVersion = rv
return nil
} )
return newVersion , err
}
2024-11-12 19:52:04 +08:00
func ( b * backend ) ReadResource ( ctx context . Context , req * resource . ReadRequest ) * resource . BackendReadResponse {
2024-10-10 04:32:09 +08:00
_ , span := b . tracer . Start ( ctx , tracePrefix + ".Read" )
2024-07-18 23:03:18 +08:00
defer span . End ( )
// TODO: validate key ?
2024-08-20 20:29:06 +08:00
readReq := & sqlResourceReadRequest {
2024-11-12 19:52:04 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
Request : req ,
Response : NewReadResponse ( ) ,
2024-07-18 23:03:18 +08:00
}
sr := sqlResourceRead
if req . ResourceVersion > 0 {
// read a specific version
sr = sqlResourceHistoryRead
}
2024-11-12 19:52:04 +08:00
var res * resource . BackendReadResponse
2024-08-20 20:29:06 +08:00
err := b . db . WithTx ( ctx , ReadCommittedRO , func ( ctx context . Context , tx db . Tx ) error {
var err error
res , err = dbutil . QueryRow ( ctx , tx , sr , readReq )
2024-12-14 06:55:43 +08:00
// if not found, look for latest deleted version (if requested)
if errors . Is ( err , sql . ErrNoRows ) && req . IncludeDeleted {
sr = sqlResourceHistoryRead
readReq2 := & sqlResourceReadRequest {
SQLTemplate : sqltemplate . New ( b . dialect ) ,
Request : req ,
Response : NewReadResponse ( ) ,
}
res , err = dbutil . QueryRow ( ctx , tx , sr , readReq2 )
return err
}
2024-08-20 20:29:06 +08:00
return err
} )
2024-12-14 06:55:43 +08:00
2024-07-18 23:03:18 +08:00
if errors . Is ( err , sql . ErrNoRows ) {
2024-11-12 19:52:04 +08:00
return & resource . BackendReadResponse {
2024-07-30 18:16:16 +08:00
Error : resource . NewNotFoundError ( req . Key ) ,
}
2024-07-18 23:03:18 +08:00
} else if err != nil {
2024-11-12 19:52:04 +08:00
return & resource . BackendReadResponse { Error : resource . AsErrorResult ( err ) }
2024-07-18 23:03:18 +08:00
}
2024-11-12 19:52:04 +08:00
return res
2024-07-18 23:03:18 +08:00
}
2024-07-31 17:05:59 +08:00
func ( b * backend ) ListIterator ( ctx context . Context , req * resource . ListRequest , cb func ( resource . ListIterator ) error ) ( int64 , error ) {
2025-01-17 20:54:25 +08:00
ctx , span := b . tracer . Start ( ctx , tracePrefix + "List" )
2024-07-18 23:03:18 +08:00
defer span . End ( )
2024-07-22 21:07:12 +08:00
if req . Options == nil || req . Options . Key . Group == "" || req . Options . Key . Resource == "" {
2024-07-31 17:05:59 +08:00
return 0 , fmt . Errorf ( "missing group or resource" )
2024-07-22 21:07:12 +08:00
}
2025-01-17 20:54:25 +08:00
if req . Source != resource . ListRequest_STORE {
return b . getHistory ( ctx , req , cb )
}
2024-07-18 23:03:18 +08:00
// TODO: think about how to handler VersionMatch. We should be able to use latest for the first page (only).
2024-07-23 01:08:30 +08:00
// TODO: add support for RemainingItemCount
2024-07-18 23:03:18 +08:00
if req . ResourceVersion > 0 || req . NextPageToken != "" {
2024-07-31 17:05:59 +08:00
return b . listAtRevision ( ctx , req , cb )
2024-07-18 23:03:18 +08:00
}
2024-07-31 17:05:59 +08:00
return b . listLatest ( ctx , req , cb )
}
type listIter struct {
2024-10-17 19:41:06 +08:00
rows db . Rows
2024-07-31 17:05:59 +08:00
offset int64
listRV int64
// any error
err error
// The row
rv int64
value [ ] byte
namespace string
name string
2024-11-12 19:52:04 +08:00
folder string
2024-07-31 17:05:59 +08:00
}
// ContinueToken implements resource.ListIterator.
func ( l * listIter ) ContinueToken ( ) string {
return ContinueToken { ResourceVersion : l . listRV , StartOffset : l . offset } . String ( )
}
2025-01-28 22:17:52 +08:00
func ( l * listIter ) ContinueTokenWithCurrentRV ( ) string {
return ContinueToken { ResourceVersion : l . rv , StartOffset : l . offset } . String ( )
}
2024-07-31 17:05:59 +08:00
func ( l * listIter ) Error ( ) error {
return l . err
}
func ( l * listIter ) Name ( ) string {
return l . name
}
func ( l * listIter ) Namespace ( ) string {
return l . namespace
2024-07-18 23:03:18 +08:00
}
2024-11-12 19:52:04 +08:00
func ( l * listIter ) Folder ( ) string {
return l . folder
}
2024-07-31 17:05:59 +08:00
// ResourceVersion implements resource.ListIterator.
func ( l * listIter ) ResourceVersion ( ) int64 {
return l . rv
}
// Value implements resource.ListIterator.
func ( l * listIter ) Value ( ) [ ] byte {
return l . value
}
// Next implements resource.ListIterator.
func ( l * listIter ) Next ( ) bool {
if l . rows . Next ( ) {
l . offset ++
2024-11-12 19:52:04 +08:00
l . err = l . rows . Scan ( & l . rv , & l . namespace , & l . name , & l . folder , & l . value )
2024-07-31 17:05:59 +08:00
return true
}
return false
}
var _ resource . ListIterator = ( * listIter ) ( nil )
2024-07-18 23:03:18 +08:00
// listLatest fetches the resources from the resource table.
2024-07-31 17:05:59 +08:00
func ( b * backend ) listLatest ( ctx context . Context , req * resource . ListRequest , cb func ( resource . ListIterator ) error ) ( int64 , error ) {
if req . NextPageToken != "" {
return 0 , fmt . Errorf ( "only works for the first page" )
}
if req . ResourceVersion > 0 {
return 0 , fmt . Errorf ( "only works for the 'latest' resource version" )
2024-07-18 23:03:18 +08:00
}
2024-07-31 17:05:59 +08:00
iter := & listIter { }
2024-07-23 01:08:30 +08:00
err := b . db . WithTx ( ctx , ReadCommittedRO , func ( ctx context . Context , tx db . Tx ) error {
2024-07-18 23:03:18 +08:00
var err error
2024-07-31 17:05:59 +08:00
iter . listRV , err = fetchLatestRV ( ctx , tx , b . dialect , req . Options . Key . Group , req . Options . Key . Resource )
2024-07-18 23:03:18 +08:00
if err != nil {
return err
}
listReq := sqlResourceListRequest {
2024-08-16 20:12:37 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
Request : new ( resource . ListRequest ) ,
2024-07-18 23:03:18 +08:00
}
2024-07-23 01:08:30 +08:00
listReq . Request = proto . Clone ( req ) . ( * resource . ListRequest )
2024-07-18 23:03:18 +08:00
2024-07-31 17:05:59 +08:00
rows , err := dbutil . QueryRows ( ctx , tx , sqlResourceList , listReq )
if rows != nil {
defer func ( ) {
if err := rows . Close ( ) ; err != nil {
b . log . Warn ( "listLatest error closing rows" , "error" , err )
}
} ( )
2024-07-18 23:03:18 +08:00
}
2024-07-31 17:05:59 +08:00
if err != nil {
return err
2024-07-18 23:03:18 +08:00
}
2024-07-31 17:05:59 +08:00
iter . rows = rows
return cb ( iter )
2024-07-18 23:03:18 +08:00
} )
2024-07-31 17:05:59 +08:00
return iter . listRV , err
2024-07-18 23:03:18 +08:00
}
// listAtRevision fetches the resources from the resource_history table at a specific revision.
2024-07-31 17:05:59 +08:00
func ( b * backend ) listAtRevision ( ctx context . Context , req * resource . ListRequest , cb func ( resource . ListIterator ) error ) ( int64 , error ) {
2024-07-18 23:03:18 +08:00
// Get the RV
2024-07-31 17:05:59 +08:00
iter := & listIter { listRV : req . ResourceVersion }
2024-07-18 23:03:18 +08:00
if req . NextPageToken != "" {
continueToken , err := GetContinueToken ( req . NextPageToken )
if err != nil {
2024-07-31 17:05:59 +08:00
return 0 , fmt . Errorf ( "get continue token: %w" , err )
2024-07-18 23:03:18 +08:00
}
2024-07-31 17:05:59 +08:00
iter . listRV = continueToken . ResourceVersion
iter . offset = continueToken . StartOffset
2024-07-18 23:03:18 +08:00
2024-07-31 17:05:59 +08:00
if req . ResourceVersion != 0 && req . ResourceVersion != iter . listRV {
return 0 , apierrors . NewBadRequest ( "request resource version does not math token" )
}
}
if iter . listRV < 1 {
return 0 , apierrors . NewBadRequest ( "expecting an explicit resource version query" )
2024-07-18 23:03:18 +08:00
}
2024-07-23 01:08:30 +08:00
err := b . db . WithTx ( ctx , ReadCommittedRO , func ( ctx context . Context , tx db . Tx ) error {
2024-07-31 17:05:59 +08:00
limit := int64 ( 0 ) // ignore limit
if iter . offset > 0 {
limit = math . MaxInt64 // a limit is required for offset
}
2024-07-18 23:03:18 +08:00
listReq := sqlResourceHistoryListRequest {
2024-08-16 20:12:37 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
2024-07-18 23:03:18 +08:00
Request : & historyListRequest {
2024-07-31 17:05:59 +08:00
ResourceVersion : iter . listRV ,
Limit : limit ,
Offset : iter . offset ,
2024-07-18 23:03:18 +08:00
Options : req . Options ,
} ,
}
2024-07-23 01:08:30 +08:00
2024-07-31 17:05:59 +08:00
rows , err := dbutil . QueryRows ( ctx , tx , sqlResourceHistoryList , listReq )
if rows != nil {
defer func ( ) {
if err := rows . Close ( ) ; err != nil {
b . log . Warn ( "listAtRevision error closing rows" , "error" , err )
}
} ( )
2024-07-18 23:03:18 +08:00
}
2024-07-31 17:05:59 +08:00
if err != nil {
return err
2024-07-18 23:03:18 +08:00
}
2024-07-31 17:05:59 +08:00
iter . rows = rows
return cb ( iter )
2024-07-18 23:03:18 +08:00
} )
2024-07-31 17:05:59 +08:00
return iter . listRV , err
2024-07-18 23:03:18 +08:00
}
2025-01-17 20:54:25 +08:00
// listLatest fetches the resources from the resource table.
func ( b * backend ) getHistory ( ctx context . Context , req * resource . ListRequest , cb func ( resource . ListIterator ) error ) ( int64 , error ) {
listReq := sqlGetHistoryRequest {
SQLTemplate : sqltemplate . New ( b . dialect ) ,
Key : req . Options . Key ,
Trash : req . Source == resource . ListRequest_TRASH ,
}
iter := & listIter { }
if req . NextPageToken != "" {
continueToken , err := GetContinueToken ( req . NextPageToken )
if err != nil {
return 0 , fmt . Errorf ( "get continue token: %w" , err )
}
listReq . StartRV = continueToken . ResourceVersion
}
err := b . db . WithTx ( ctx , ReadCommittedRO , func ( ctx context . Context , tx db . Tx ) error {
var err error
iter . listRV , err = fetchLatestRV ( ctx , tx , b . dialect , req . Options . Key . Group , req . Options . Key . Resource )
if err != nil {
return err
}
rows , err := dbutil . QueryRows ( ctx , tx , sqlResourceHistoryGet , listReq )
if rows != nil {
defer func ( ) {
if err := rows . Close ( ) ; err != nil {
b . log . Warn ( "listLatest error closing rows" , "error" , err )
}
} ( )
}
if err != nil {
return err
}
iter . rows = rows
return cb ( iter )
} )
return iter . listRV , err
}
2024-07-18 23:03:18 +08:00
func ( b * backend ) WatchWriteEvents ( ctx context . Context ) ( <- chan * resource . WrittenEvent , error ) {
2024-07-22 21:07:12 +08:00
// Get the latest RV
since , err := b . listLatestRVs ( ctx )
2024-07-18 23:03:18 +08:00
if err != nil {
2025-02-12 01:57:46 +08:00
return nil , fmt . Errorf ( "watch, get latest resource version: %w" , err )
2024-07-18 23:03:18 +08:00
}
// Start the poller
stream := make ( chan * resource . WrittenEvent )
go b . poller ( ctx , since , stream )
return stream , nil
}
2024-07-22 21:07:12 +08:00
func ( b * backend ) poller ( ctx context . Context , since groupResourceRV , stream chan <- * resource . WrittenEvent ) {
2024-10-07 16:01:53 +08:00
t := time . NewTicker ( b . pollingInterval )
2024-07-18 23:03:18 +08:00
defer close ( stream )
defer t . Stop ( )
2025-02-12 01:57:46 +08:00
isSQLite := b . dialect . DialectName ( ) == "sqlite"
2024-07-18 23:03:18 +08:00
for {
select {
2024-07-23 01:08:30 +08:00
case <- b . done :
2024-07-18 23:03:18 +08:00
return
case <- t . C :
2025-02-12 01:57:46 +08:00
// Block polling duffing import to avoid database locked issues
if isSQLite && b . batchLock . Active ( ) {
continue
}
2024-10-25 03:49:38 +08:00
ctx , span := b . tracer . Start ( ctx , tracePrefix + "poller" )
2024-07-22 21:07:12 +08:00
// List the latest RVs
grv , err := b . listLatestRVs ( ctx )
2024-07-18 23:03:18 +08:00
if err != nil {
2025-02-12 01:57:46 +08:00
b . log . Error ( "poller get latest resource version" , "err" , err )
2024-10-07 16:01:53 +08:00
t . Reset ( b . pollingInterval )
2024-07-22 21:07:12 +08:00
continue
}
for group , items := range grv {
for resource := range items {
// If we haven't seen this resource before, we start from 0
if _ , ok := since [ group ] ; ! ok {
since [ group ] = make ( map [ string ] int64 )
}
if _ , ok := since [ group ] [ resource ] ; ! ok {
since [ group ] [ resource ] = 0
}
// Poll for new events
next , err := b . poll ( ctx , group , resource , since [ group ] [ resource ] , stream )
if err != nil {
b . log . Error ( "polling for resource" , "err" , err )
2024-10-07 16:01:53 +08:00
t . Reset ( b . pollingInterval )
2024-07-22 21:07:12 +08:00
continue
}
2024-09-20 21:51:09 +08:00
if next > since [ group ] [ resource ] {
since [ group ] [ resource ] = next
}
2024-07-22 21:07:12 +08:00
}
2024-07-18 23:03:18 +08:00
}
2024-07-22 21:07:12 +08:00
2024-10-07 16:01:53 +08:00
t . Reset ( b . pollingInterval )
2024-10-25 03:49:38 +08:00
span . End ( )
2024-07-18 23:03:18 +08:00
}
}
}
2024-07-22 21:07:12 +08:00
// listLatestRVs returns the latest resource version for each (Group, Resource) pair.
func ( b * backend ) listLatestRVs ( ctx context . Context ) ( groupResourceRV , error ) {
2024-10-25 03:49:38 +08:00
ctx , span := b . tracer . Start ( ctx , tracePrefix + "listLatestRVs" )
defer span . End ( )
2024-08-20 20:29:06 +08:00
var grvs [ ] * groupResourceVersion
err := b . db . WithTx ( ctx , ReadCommittedRO , func ( ctx context . Context , tx db . Tx ) error {
var err error
grvs , err = dbutil . Query ( ctx , tx , sqlResourceVersionList , & sqlResourceVersionListRequest {
SQLTemplate : sqltemplate . New ( b . dialect ) ,
groupResourceVersion : new ( groupResourceVersion ) ,
} )
return err
} )
2024-07-18 23:03:18 +08:00
if err != nil {
2024-08-20 20:29:06 +08:00
return nil , err
2024-07-18 23:03:18 +08:00
}
2024-07-22 21:07:12 +08:00
2024-08-20 20:29:06 +08:00
since := groupResourceRV { }
for _ , grv := range grvs {
if since [ grv . Group ] == nil {
since [ grv . Group ] = map [ string ] int64 { }
2024-07-22 21:07:12 +08:00
}
2024-08-20 20:29:06 +08:00
since [ grv . Group ] [ grv . Resource ] = grv . ResourceVersion
2024-07-18 23:03:18 +08:00
}
2024-08-20 20:29:06 +08:00
2024-07-22 21:07:12 +08:00
return since , nil
2024-07-18 23:03:18 +08:00
}
2024-07-22 21:07:12 +08:00
// fetchLatestRV returns the current maximum RV in the resource table
func fetchLatestRV ( ctx context . Context , x db . ContextExecer , d sqltemplate . Dialect , group , resource string ) ( int64 , error ) {
2024-10-11 17:11:33 +08:00
res , err := dbutil . QueryRow ( ctx , x , sqlResourceVersionGet , sqlResourceVersionGetRequest {
SQLTemplate : sqltemplate . New ( d ) ,
Group : group ,
Resource : resource ,
ReadOnly : true ,
Response : new ( resourceVersionResponse ) ,
2024-07-22 21:07:12 +08:00
} )
if errors . Is ( err , sql . ErrNoRows ) {
2024-07-26 00:17:39 +08:00
return 1 , nil
2024-07-22 21:07:12 +08:00
} else if err != nil {
return 0 , fmt . Errorf ( "get resource version: %w" , err )
}
return res . ResourceVersion , nil
}
func ( b * backend ) poll ( ctx context . Context , grp string , res string , since int64 , stream chan <- * resource . WrittenEvent ) ( int64 , error ) {
2024-10-10 04:32:09 +08:00
ctx , span := b . tracer . Start ( ctx , tracePrefix + "poll" )
2024-07-18 23:03:18 +08:00
defer span . End ( )
2025-01-29 02:30:20 +08:00
start := time . Now ( )
2024-08-20 20:29:06 +08:00
var records [ ] * historyPollResponse
err := b . db . WithTx ( ctx , ReadCommittedRO , func ( ctx context . Context , tx db . Tx ) error {
var err error
records , err = dbutil . Query ( ctx , tx , sqlResourceHistoryPoll , & sqlResourceHistoryPollRequest {
SQLTemplate : sqltemplate . New ( b . dialect ) ,
Resource : res ,
Group : grp ,
SinceResourceVersion : since ,
Response : & historyPollResponse { } ,
} )
return err
} )
2024-07-18 23:03:18 +08:00
if err != nil {
2024-08-20 20:29:06 +08:00
return 0 , fmt . Errorf ( "poll history: %w" , err )
2024-07-18 23:03:18 +08:00
}
2025-01-29 02:30:20 +08:00
end := time . Now ( )
resource . NewStorageMetrics ( ) . PollerLatency . Observe ( end . Sub ( start ) . Seconds ( ) )
2024-07-22 21:07:12 +08:00
2024-08-20 20:29:06 +08:00
var nextRV int64
for _ , rec := range records {
if rec . Key . Group == "" || rec . Key . Resource == "" || rec . Key . Name == "" {
2024-07-22 21:07:12 +08:00
return nextRV , fmt . Errorf ( "missing key in response" )
}
2024-08-20 20:29:06 +08:00
nextRV = rec . ResourceVersion
2024-10-07 16:01:53 +08:00
prevRV := rec . PreviousRV
if prevRV == nil {
2024-11-28 21:28:55 +08:00
prevRV = new ( int64 )
2024-10-07 16:01:53 +08:00
}
2024-07-18 23:03:18 +08:00
stream <- & resource . WrittenEvent {
WriteEvent : resource . WriteEvent {
2024-08-20 20:29:06 +08:00
Value : rec . Value ,
2024-07-18 23:03:18 +08:00
Key : & resource . ResourceKey {
2024-08-20 20:29:06 +08:00
Namespace : rec . Key . Namespace ,
Group : rec . Key . Group ,
Resource : rec . Key . Resource ,
Name : rec . Key . Name ,
2024-07-18 23:03:18 +08:00
} ,
2024-10-07 16:01:53 +08:00
Type : resource . WatchEvent_Type ( rec . Action ) ,
PreviousRV : * prevRV ,
2024-07-18 23:03:18 +08:00
} ,
2024-11-12 19:52:04 +08:00
Folder : rec . Folder ,
2024-08-20 20:29:06 +08:00
ResourceVersion : rec . ResourceVersion ,
2024-07-18 23:03:18 +08:00
// Timestamp: , // TODO: add timestamp
}
2025-01-29 02:30:20 +08:00
b . log . Debug ( "poller sent event to stream" , "namespace" , rec . Key . Namespace , "group" , rec . Key . Group , "resource" , rec . Key . Resource , "name" , rec . Key . Name , "action" , rec . Action , "rv" , rec . ResourceVersion )
2024-07-18 23:03:18 +08:00
}
2024-08-20 20:29:06 +08:00
2024-07-22 21:07:12 +08:00
return nextRV , nil
2024-07-18 23:03:18 +08:00
}
2024-10-11 17:11:33 +08:00
// resourceVersionAtomicInc atomically increases the version of a kind within a transaction.
2024-07-18 23:03:18 +08:00
// TODO: Ideally we should attempt to update the RV in the resource and resource_history tables
// in a single roundtrip. This would reduce the latency of the operation, and also increase the
// throughput of the system. This is a good candidate for a future optimization.
2024-10-18 11:32:08 +08:00
func ( b * backend ) resourceVersionAtomicInc ( ctx context . Context , x db . ContextExecer , key * resource . ResourceKey ) ( newVersion int64 , err error ) {
2024-10-24 23:12:20 +08:00
ctx , span := b . tracer . Start ( ctx , tracePrefix + "version_atomic_inc" , trace . WithAttributes (
2024-10-18 11:32:08 +08:00
semconv . K8SNamespaceName ( key . Namespace ) ,
// TODO: the following attributes could use some standardization.
attribute . String ( "k8s.resource.group" , key . Group ) ,
attribute . String ( "k8s.resource.type" , key . Resource ) ,
) )
defer span . End ( )
2024-10-11 17:11:33 +08:00
// 1. Lock to row and prevent concurrent updates until the transaction is committed.
res , err := dbutil . QueryRow ( ctx , x , sqlResourceVersionGet , sqlResourceVersionGetRequest {
2024-10-18 11:32:08 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
2024-10-11 17:11:33 +08:00
Group : key . Group ,
Resource : key . Resource ,
Response : new ( resourceVersionResponse ) , ReadOnly : false , // This locks the row for update
2024-07-23 01:08:30 +08:00
} )
2024-07-18 23:03:18 +08:00
if errors . Is ( err , sql . ErrNoRows ) {
2024-10-11 17:11:33 +08:00
// if there wasn't a row associated with the given resource, then we create it.
if _ , err = dbutil . Exec ( ctx , x , sqlResourceVersionInsert , sqlResourceVersionUpsertRequest {
2024-10-18 11:32:08 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
2024-10-11 17:11:33 +08:00
Group : key . Group ,
Resource : key . Resource ,
2024-07-18 23:03:18 +08:00
} ) ; err != nil {
return 0 , fmt . Errorf ( "insert into resource_version: %w" , err )
}
2024-10-11 17:11:33 +08:00
res , err = dbutil . QueryRow ( ctx , x , sqlResourceVersionGet , sqlResourceVersionGetRequest {
2024-10-18 11:32:08 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
2024-10-11 17:11:33 +08:00
Group : key . Group ,
Resource : key . Resource ,
Response : new ( resourceVersionResponse ) ,
ReadOnly : true , // This locks the row for update
} )
if err != nil {
return 0 , fmt . Errorf ( "fetching RV after read" )
}
return res . ResourceVersion , nil
} else if err != nil {
return 0 , fmt . Errorf ( "lock the resource version: %w" , err )
2024-07-18 23:03:18 +08:00
}
2024-07-23 01:08:30 +08:00
2024-10-11 17:11:33 +08:00
// 2. Update the RV
// Most times, the RV is the current microsecond timestamp generated on the sql server (to avoid clock skew).
// In rare occasion, the server clock might go back in time. In those cases, we simply increment the
// previous RV until the clock catches up.
nextRV := max ( res . CurrentEpoch , res . ResourceVersion + 1 )
2024-07-23 01:08:30 +08:00
2024-10-11 17:11:33 +08:00
_ , err = dbutil . Exec ( ctx , x , sqlResourceVersionUpdate , sqlResourceVersionUpsertRequest {
2024-10-18 11:32:08 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
2024-10-11 17:11:33 +08:00
Group : key . Group ,
Resource : key . Resource ,
ResourceVersion : nextRV ,
2024-07-18 23:03:18 +08:00
} )
if err != nil {
return 0 , fmt . Errorf ( "increase resource version: %w" , err )
}
return nextRV , nil
}