2024-07-18 23:03:18 +08:00
package sql
import (
"context"
"database/sql"
"errors"
"fmt"
2025-08-21 01:54:31 +08:00
"iter"
2024-07-31 17:05:59 +08:00
"math"
2024-07-23 01:08:30 +08:00
"sync"
2024-07-18 23:03:18 +08:00
"time"
2025-03-26 21:44:44 +08:00
"github.com/go-sql-driver/mysql"
"github.com/jackc/pgx/v5/pgconn"
2025-03-31 21:06:31 +08:00
"github.com/lib/pq"
2025-03-17 18:36:38 +08:00
"github.com/prometheus/client_golang/prometheus"
2024-10-10 04:32:09 +08:00
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"google.golang.org/protobuf/proto"
apierrors "k8s.io/apimachinery/pkg/api/errors"
2025-08-29 20:49:20 +08:00
"github.com/grafana/grafana/pkg/util/sqlite"
2025-04-08 21:35:11 +08:00
"github.com/grafana/grafana-app-sdk/logging"
2025-04-17 18:58:58 +08:00
2024-07-18 23:03:18 +08:00
"github.com/grafana/grafana/pkg/storage/unified/resource"
2025-05-16 03:36:52 +08:00
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
2024-07-18 23:03:18 +08:00
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
2024-07-23 01:08:30 +08:00
"github.com/grafana/grafana/pkg/storage/unified/sql/dbutil"
2024-07-18 23:03:18 +08:00
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
2025-03-17 18:36:38 +08:00
"github.com/grafana/grafana/pkg/util/debouncer"
2024-07-18 23:03:18 +08:00
)
2024-10-10 04:32:09 +08:00
const tracePrefix = "sql.resource."
2024-10-07 16:01:53 +08:00
const defaultPollingInterval = 100 * time . Millisecond
2025-02-14 18:18:59 +08:00
const defaultWatchBufferSize = 100 // number of events to buffer in the watch stream
2025-03-25 23:00:12 +08:00
const defaultPrunerHistoryLimit = 20
2024-07-18 23:03:18 +08:00
2024-07-23 01:08:30 +08:00
type Backend interface {
resource . StorageBackend
2025-05-16 03:36:52 +08:00
resourcepb . DiagnosticsServer
2024-07-23 01:08:30 +08:00
resource . LifecycleHooks
2024-07-18 23:03:18 +08:00
}
2024-07-23 01:08:30 +08:00
type BackendOptions struct {
2025-01-23 18:34:48 +08:00
DBProvider db . DBProvider
Tracer trace . Tracer
2025-03-17 18:36:38 +08:00
Reg prometheus . Registerer
2025-01-23 18:34:48 +08:00
PollingInterval time . Duration
2025-02-14 18:18:59 +08:00
WatchBufferSize int
2025-02-21 19:25:35 +08:00
IsHA bool
2025-02-28 20:39:39 +08:00
storageMetrics * resource . StorageMetrics
2025-02-26 23:17:35 +08:00
2025-03-17 18:36:38 +08:00
// If true, the backend will prune history on write events.
// Will be removed once fully rolled out.
withPruner bool
2025-02-26 23:17:35 +08:00
// testing
SimulatedNetworkLatency time . Duration // slows down the create transactions by a fixed amount
2024-07-23 01:08:30 +08:00
}
2024-07-18 23:03:18 +08:00
2024-07-23 01:08:30 +08:00
func NewBackend ( opts BackendOptions ) ( Backend , error ) {
if opts . DBProvider == nil {
return nil , errors . New ( "no db provider" )
}
2024-07-18 23:03:18 +08:00
if opts . Tracer == nil {
opts . Tracer = noop . NewTracerProvider ( ) . Tracer ( "sql-backend" )
}
2024-07-23 01:08:30 +08:00
ctx , cancel := context . WithCancel ( context . Background ( ) )
2024-07-18 23:03:18 +08:00
2025-02-21 19:25:35 +08:00
if opts . PollingInterval == 0 {
opts . PollingInterval = defaultPollingInterval
2024-10-07 16:01:53 +08:00
}
2025-02-14 18:18:59 +08:00
if opts . WatchBufferSize == 0 {
opts . WatchBufferSize = defaultWatchBufferSize
}
2024-07-18 23:03:18 +08:00
return & backend {
2025-02-26 23:17:35 +08:00
isHA : opts . IsHA ,
done : ctx . Done ( ) ,
cancel : cancel ,
2025-04-08 21:35:11 +08:00
log : logging . DefaultLogger . With ( "logger" , "sql-resource-server" ) ,
2025-02-26 23:17:35 +08:00
tracer : opts . Tracer ,
2025-03-17 18:36:38 +08:00
reg : opts . Reg ,
2025-02-26 23:17:35 +08:00
dbProvider : opts . DBProvider ,
pollingInterval : opts . PollingInterval ,
watchBufferSize : opts . WatchBufferSize ,
2025-02-28 20:39:39 +08:00
storageMetrics : opts . storageMetrics ,
2025-02-28 13:41:08 +08:00
bulkLock : & bulkLock { running : make ( map [ string ] bool ) } ,
2025-02-26 23:17:35 +08:00
simulatedNetworkLatency : opts . SimulatedNetworkLatency ,
2025-03-17 18:36:38 +08:00
withPruner : opts . withPruner ,
2024-07-18 23:03:18 +08:00
} , nil
}
2025-03-17 18:36:38 +08:00
// pruningKey is a comparable key for pruning history.
type pruningKey struct {
namespace string
group string
resource string
2025-03-19 19:15:04 +08:00
name string
2025-03-17 18:36:38 +08:00
}
// Small abstraction to allow for different pruner implementations.
// This can be removed once the debouncer is deployed.
type pruner interface {
Add ( key pruningKey ) error
Start ( ctx context . Context )
}
type noopPruner struct { }
func ( p * noopPruner ) Add ( key pruningKey ) error {
return nil
}
func ( p * noopPruner ) Start ( ctx context . Context ) { }
2024-07-18 23:03:18 +08:00
type backend struct {
2025-02-21 19:25:35 +08:00
//general
isHA bool
2024-07-23 01:08:30 +08:00
// server lifecycle
done <- chan struct { }
cancel context . CancelFunc
initOnce sync . Once
initErr error
// o11y
2025-04-08 21:35:11 +08:00
log logging . Logger
2025-02-28 20:39:39 +08:00
tracer trace . Tracer
2025-03-17 18:36:38 +08:00
reg prometheus . Registerer
2025-02-28 20:39:39 +08:00
storageMetrics * resource . StorageMetrics
2024-07-23 01:08:30 +08:00
// database
2025-01-23 18:34:48 +08:00
dbProvider db . DBProvider
db db . DB
dialect sqltemplate . Dialect
2025-02-28 13:41:08 +08:00
bulkLock * bulkLock
2024-07-23 01:08:30 +08:00
// watch streaming
2024-07-18 23:03:18 +08:00
//stream chan *resource.WatchEvent
2024-10-07 16:01:53 +08:00
pollingInterval time . Duration
2025-02-14 18:18:59 +08:00
watchBufferSize int
2025-02-21 19:25:35 +08:00
notifier eventNotifier
2025-02-26 23:17:35 +08:00
2025-03-13 16:24:12 +08:00
// resource version manager
rvManager * resourceVersionManager
2025-02-26 23:17:35 +08:00
// testing
simulatedNetworkLatency time . Duration
2025-03-17 18:36:38 +08:00
historyPruner pruner
withPruner bool
2024-07-18 23:03:18 +08:00
}
2024-07-23 01:08:30 +08:00
func ( b * backend ) Init ( ctx context . Context ) error {
b . initOnce . Do ( func ( ) {
b . initErr = b . initLocked ( ctx )
} )
return b . initErr
}
2024-07-18 23:03:18 +08:00
2024-07-23 01:08:30 +08:00
func ( b * backend ) initLocked ( ctx context . Context ) error {
2025-03-17 18:36:38 +08:00
dbConn , err := b . dbProvider . Init ( ctx )
2024-07-18 23:03:18 +08:00
if err != nil {
2024-07-23 01:08:30 +08:00
return fmt . Errorf ( "initialize resource DB: %w" , err )
2024-07-18 23:03:18 +08:00
}
2025-03-17 18:36:38 +08:00
if err := dbConn . PingContext ( ctx ) ; err != nil {
return fmt . Errorf ( "ping resource DB: %w" , err )
}
b . db = dbConn
driverName := dbConn . DriverName ( )
2024-07-23 01:08:30 +08:00
b . dialect = sqltemplate . DialectForDriver ( driverName )
if b . dialect == nil {
2024-07-18 23:03:18 +08:00
return fmt . Errorf ( "no dialect for driver %q" , driverName )
}
2025-03-13 16:24:12 +08:00
// Initialize ResourceVersionManager
rvManager , err := NewResourceVersionManager ( ResourceManagerOptions {
Dialect : b . dialect ,
DB : b . db ,
Tracer : b . tracer ,
} )
if err != nil {
return fmt . Errorf ( "failed to create resource version manager: %w" , err )
}
b . rvManager = rvManager
2025-02-21 19:25:35 +08:00
// Initialize notifier after dialect is set up
notifier , err := newNotifier ( b )
if err != nil {
return fmt . Errorf ( "failed to create notifier: %w" , err )
}
b . notifier = notifier
2025-03-17 18:36:38 +08:00
if err := b . initPruner ( ctx ) ; err != nil {
return fmt . Errorf ( "failed to create pruner: %w" , err )
}
return nil
}
func ( b * backend ) initPruner ( ctx context . Context ) error {
if ! b . withPruner {
b . log . Debug ( "using noop history pruner" )
b . historyPruner = & noopPruner { }
return nil
}
b . log . Debug ( "using debounced history pruner" )
// Initialize history pruner.
pruner , err := debouncer . NewGroup ( debouncer . DebouncerOpts [ pruningKey ] {
Name : "history_pruner" ,
BufferSize : 1000 ,
MinWait : time . Second * 30 ,
MaxWait : time . Minute * 5 ,
ProcessHandler : func ( ctx context . Context , key pruningKey ) error {
return b . db . WithTx ( ctx , ReadCommitted , func ( ctx context . Context , tx db . Tx ) error {
res , err := dbutil . Exec ( ctx , tx , sqlResourceHistoryPrune , & sqlPruneHistoryRequest {
SQLTemplate : sqltemplate . New ( b . dialect ) ,
2025-03-25 23:00:12 +08:00
HistoryLimit : defaultPrunerHistoryLimit ,
2025-05-16 03:36:52 +08:00
Key : & resourcepb . ResourceKey {
2025-03-17 18:36:38 +08:00
Namespace : key . namespace ,
Group : key . group ,
Resource : key . resource ,
2025-03-19 19:15:04 +08:00
Name : key . name ,
2025-03-17 18:36:38 +08:00
} ,
} )
if err != nil {
return fmt . Errorf ( "failed to prune history: %w" , err )
}
rows , err := res . RowsAffected ( )
if err != nil {
return fmt . Errorf ( "failed to get rows affected: %w" , err )
}
b . log . Debug ( "pruned history successfully" ,
"namespace" , key . namespace ,
"group" , key . group ,
"resource" , key . resource ,
2025-03-19 19:15:04 +08:00
"name" , key . name ,
2025-03-17 18:36:38 +08:00
"rows" , rows )
return nil
} )
} ,
ErrorHandler : func ( key pruningKey , err error ) {
b . log . Error ( "failed to prune history" ,
"namespace" , key . namespace ,
"group" , key . group ,
"resource" , key . resource ,
2025-03-19 19:15:04 +08:00
"name" , key . name ,
2025-03-17 18:36:38 +08:00
"error" , err )
} ,
Reg : b . reg ,
} )
if err != nil {
return err
}
b . historyPruner = pruner
b . historyPruner . Start ( ctx )
return nil
2024-07-18 23:03:18 +08:00
}
2025-05-16 03:36:52 +08:00
func ( b * backend ) IsHealthy ( ctx context . Context , _ * resourcepb . HealthCheckRequest ) ( * resourcepb . HealthCheckResponse , error ) {
2024-07-18 23:03:18 +08:00
// ctxLogger := s.log.FromContext(log.WithContextualAttributes(ctx, []any{"method", "isHealthy"}))
2024-07-23 01:08:30 +08:00
if err := b . db . PingContext ( ctx ) ; err != nil {
2024-07-18 23:03:18 +08:00
return nil , err
}
2024-07-23 01:08:30 +08:00
2025-05-16 03:36:52 +08:00
return & resourcepb . HealthCheckResponse { Status : resourcepb . HealthCheckResponse_SERVING } , nil
2024-07-18 23:03:18 +08:00
}
2024-07-23 01:08:30 +08:00
func ( b * backend ) Stop ( _ context . Context ) error {
2024-07-18 23:03:18 +08:00
b . cancel ( )
2024-07-23 01:08:30 +08:00
return nil
2024-07-18 23:03:18 +08:00
}
2024-12-04 01:20:27 +08:00
// GetResourceStats implements Backend.
2024-12-05 18:58:13 +08:00
func ( b * backend ) GetResourceStats ( ctx context . Context , namespace string , minCount int ) ( [ ] resource . ResourceStats , error ) {
2025-05-20 04:25:08 +08:00
ctx , span := b . tracer . Start ( ctx , tracePrefix + "GetResourceStats" )
2024-12-04 01:20:27 +08:00
defer span . End ( )
req := & sqlStatsRequest {
SQLTemplate : sqltemplate . New ( b . dialect ) ,
2024-12-05 18:58:13 +08:00
Namespace : namespace ,
2024-12-04 01:20:27 +08:00
MinCount : minCount , // not used in query... yet?
}
res := make ( [ ] resource . ResourceStats , 0 , 100 )
err := b . db . WithTx ( ctx , ReadCommittedRO , func ( ctx context . Context , tx db . Tx ) error {
rows , err := dbutil . QueryRows ( ctx , tx , sqlResourceStats , req )
if err != nil {
return err
}
for rows . Next ( ) {
row := resource . ResourceStats { }
err = rows . Scan ( & row . Namespace , & row . Group , & row . Resource , & row . Count , & row . ResourceVersion )
if err != nil {
return err
}
if row . Count > int64 ( minCount ) {
res = append ( res , row )
2025-06-27 20:00:39 +08:00
} else {
b . log . Debug ( "skipping stats for resource with count less than min count" , "namespace" , row . Namespace , "group" , row . Group , "resource" , row . Resource , "count" , row . Count , "minCount" , minCount )
2024-12-04 01:20:27 +08:00
}
}
return err
} )
return res , err
}
2024-07-18 23:03:18 +08:00
func ( b * backend ) WriteEvent ( ctx context . Context , event resource . WriteEvent ) ( int64 , error ) {
2024-10-10 04:32:09 +08:00
_ , span := b . tracer . Start ( ctx , tracePrefix + "WriteEvent" )
2024-07-18 23:03:18 +08:00
defer span . End ( )
// TODO: validate key ?
switch event . Type {
2025-05-16 03:36:52 +08:00
case resourcepb . WatchEvent_ADDED :
2024-07-18 23:03:18 +08:00
return b . create ( ctx , event )
2025-05-16 03:36:52 +08:00
case resourcepb . WatchEvent_MODIFIED :
2024-07-18 23:03:18 +08:00
return b . update ( ctx , event )
2025-05-16 03:36:52 +08:00
case resourcepb . WatchEvent_DELETED :
2024-07-18 23:03:18 +08:00
return b . delete ( ctx , event )
default :
return 0 , fmt . Errorf ( "unsupported event type" )
}
}
func ( b * backend ) create ( ctx context . Context , event resource . WriteEvent ) ( int64 , error ) {
2024-10-10 04:32:09 +08:00
ctx , span := b . tracer . Start ( ctx , tracePrefix + "Create" )
2024-07-18 23:03:18 +08:00
defer span . End ( )
2025-03-13 16:24:12 +08:00
2025-02-21 19:25:35 +08:00
folder := ""
if event . Object != nil {
folder = event . Object . GetFolder ( )
}
2025-03-13 16:24:12 +08:00
rv , err := b . rvManager . ExecWithRV ( ctx , event . Key , func ( tx db . Tx ) ( string , error ) {
2024-07-18 23:03:18 +08:00
// 1. Insert into resource
2024-07-23 01:08:30 +08:00
if _ , err := dbutil . Exec ( ctx , tx , sqlResourceInsert , sqlResourceRequest {
2024-08-16 20:12:37 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
WriteEvent : event ,
2024-11-12 19:52:04 +08:00
Folder : folder ,
2025-05-21 15:49:49 +08:00
GUID : event . GUID ,
2024-07-18 23:03:18 +08:00
} ) ; err != nil {
2025-05-20 18:48:47 +08:00
if IsRowAlreadyExistsError ( err ) {
2025-05-21 15:49:49 +08:00
return event . GUID , resource . ErrResourceAlreadyExists
2025-03-26 21:44:44 +08:00
}
2025-05-21 15:49:49 +08:00
return event . GUID , fmt . Errorf ( "insert into resource: %w" , err )
2024-07-18 23:03:18 +08:00
}
// 2. Insert into resource history
2024-07-23 01:08:30 +08:00
if _ , err := dbutil . Exec ( ctx , tx , sqlResourceHistoryInsert , sqlResourceRequest {
2024-08-16 20:12:37 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
WriteEvent : event ,
2024-11-12 19:52:04 +08:00
Folder : folder ,
2025-04-02 01:38:23 +08:00
Generation : event . Object . GetGeneration ( ) ,
2025-05-21 15:49:49 +08:00
GUID : event . GUID ,
2024-07-18 23:03:18 +08:00
} ) ; err != nil {
2025-05-21 15:49:49 +08:00
return event . GUID , fmt . Errorf ( "insert into resource history: %w" , err )
2024-07-18 23:03:18 +08:00
}
2025-03-19 19:15:04 +08:00
_ = b . historyPruner . Add ( pruningKey {
namespace : event . Key . Namespace ,
group : event . Key . Group ,
resource : event . Key . Resource ,
name : event . Key . Name ,
} )
2025-02-26 23:17:35 +08:00
if b . simulatedNetworkLatency > 0 {
time . Sleep ( b . simulatedNetworkLatency )
}
2025-05-21 15:49:49 +08:00
return event . GUID , nil
2024-07-18 23:03:18 +08:00
} )
2025-02-21 19:25:35 +08:00
if err != nil {
return 0 , err
}
b . notifier . send ( ctx , & resource . WrittenEvent {
Type : event . Type ,
Key : event . Key ,
PreviousRV : event . PreviousRV ,
Value : event . Value ,
2025-03-13 16:24:12 +08:00
ResourceVersion : rv ,
2025-02-21 19:25:35 +08:00
Folder : folder ,
} )
2025-03-13 16:24:12 +08:00
return rv , nil
2024-07-18 23:03:18 +08:00
}
2025-05-20 18:48:47 +08:00
// IsRowAlreadyExistsError checks if the error is the result of the row inserted already existing.
func IsRowAlreadyExistsError ( err error ) bool {
2025-07-31 17:25:19 +08:00
if sqlite . IsUniqueConstraintViolation ( err ) {
return true
2025-03-26 21:44:44 +08:00
}
var pg * pgconn . PgError
if errors . As ( err , & pg ) {
// https://www.postgresql.org/docs/current/errcodes-appendix.html
return pg . Code == "23505" // unique_violation
}
2025-03-31 21:06:31 +08:00
var pqerr * pq . Error
if errors . As ( err , & pqerr ) {
// https://www.postgresql.org/docs/current/errcodes-appendix.html
return pqerr . Code == "23505" // unique_violation
}
2025-03-26 21:44:44 +08:00
var mysqlerr * mysql . MySQLError
if errors . As ( err , & mysqlerr ) {
// https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html
return mysqlerr . Number == 1062 // ER_DUP_ENTRY
}
return false
}
2024-07-18 23:03:18 +08:00
func ( b * backend ) update ( ctx context . Context , event resource . WriteEvent ) ( int64 , error ) {
2024-10-10 04:32:09 +08:00
ctx , span := b . tracer . Start ( ctx , tracePrefix + "Update" )
2024-07-18 23:03:18 +08:00
defer span . End ( )
2025-05-21 15:49:49 +08:00
2025-02-21 19:25:35 +08:00
folder := ""
if event . Object != nil {
folder = event . Object . GetFolder ( )
}
2025-03-13 16:24:12 +08:00
// Use rvManager.ExecWithRV instead of direct transaction
rv , err := b . rvManager . ExecWithRV ( ctx , event . Key , func ( tx db . Tx ) ( string , error ) {
2024-07-23 01:08:30 +08:00
// 1. Update resource
_ , err := dbutil . Exec ( ctx , tx , sqlResourceUpdate , sqlResourceRequest {
2024-08-16 20:12:37 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
WriteEvent : event ,
2024-11-12 19:52:04 +08:00
Folder : folder ,
2025-05-21 15:49:49 +08:00
GUID : event . GUID ,
2024-07-18 23:03:18 +08:00
} )
if err != nil {
2025-05-21 15:49:49 +08:00
return event . GUID , fmt . Errorf ( "resource update: %w" , err )
2024-07-18 23:03:18 +08:00
}
// 2. Insert into resource history
2024-07-23 01:08:30 +08:00
if _ , err := dbutil . Exec ( ctx , tx , sqlResourceHistoryInsert , sqlResourceRequest {
2024-08-16 20:12:37 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
WriteEvent : event ,
2024-11-12 19:52:04 +08:00
Folder : folder ,
2025-05-21 15:49:49 +08:00
GUID : event . GUID ,
2025-04-02 01:38:23 +08:00
Generation : event . Object . GetGeneration ( ) ,
2024-07-18 23:03:18 +08:00
} ) ; err != nil {
2025-05-21 15:49:49 +08:00
return event . GUID , fmt . Errorf ( "insert into resource history: %w" , err )
2024-07-18 23:03:18 +08:00
}
2025-03-19 19:15:04 +08:00
_ = b . historyPruner . Add ( pruningKey {
namespace : event . Key . Namespace ,
group : event . Key . Group ,
resource : event . Key . Resource ,
name : event . Key . Name ,
} )
2025-05-21 15:49:49 +08:00
return event . GUID , nil
2024-07-18 23:03:18 +08:00
} )
2025-02-21 19:25:35 +08:00
if err != nil {
return 0 , err
}
b . notifier . send ( ctx , & resource . WrittenEvent {
Type : event . Type ,
Key : event . Key ,
PreviousRV : event . PreviousRV ,
Value : event . Value ,
2025-03-13 16:24:12 +08:00
ResourceVersion : rv ,
2025-02-21 19:25:35 +08:00
Folder : folder ,
} )
2025-03-13 16:24:12 +08:00
return rv , nil
2024-07-18 23:03:18 +08:00
}
func ( b * backend ) delete ( ctx context . Context , event resource . WriteEvent ) ( int64 , error ) {
2024-10-10 04:32:09 +08:00
ctx , span := b . tracer . Start ( ctx , tracePrefix + "Delete" )
2024-07-18 23:03:18 +08:00
defer span . End ( )
2025-05-21 15:49:49 +08:00
2025-02-21 19:25:35 +08:00
folder := ""
if event . Object != nil {
folder = event . Object . GetFolder ( )
}
2025-03-13 16:24:12 +08:00
rv , err := b . rvManager . ExecWithRV ( ctx , event . Key , func ( tx db . Tx ) ( string , error ) {
2024-07-18 23:03:18 +08:00
// 1. delete from resource
2024-07-23 01:08:30 +08:00
_ , err := dbutil . Exec ( ctx , tx , sqlResourceDelete , sqlResourceRequest {
2024-08-16 20:12:37 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
WriteEvent : event ,
2025-05-21 15:49:49 +08:00
GUID : event . GUID ,
2024-07-18 23:03:18 +08:00
} )
if err != nil {
2025-05-21 15:49:49 +08:00
return event . GUID , fmt . Errorf ( "delete resource: %w" , err )
2024-07-18 23:03:18 +08:00
}
2024-07-23 01:08:30 +08:00
// 2. Add event to resource history
if _ , err := dbutil . Exec ( ctx , tx , sqlResourceHistoryInsert , sqlResourceRequest {
2024-08-16 20:12:37 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
WriteEvent : event ,
2024-11-12 19:52:04 +08:00
Folder : folder ,
2025-05-21 15:49:49 +08:00
GUID : event . GUID ,
2025-04-02 01:38:23 +08:00
Generation : 0 , // object does not exist
2024-07-18 23:03:18 +08:00
} ) ; err != nil {
2025-05-21 15:49:49 +08:00
return event . GUID , fmt . Errorf ( "insert into resource history: %w" , err )
2024-07-18 23:03:18 +08:00
}
2025-03-19 19:15:04 +08:00
_ = b . historyPruner . Add ( pruningKey {
namespace : event . Key . Namespace ,
group : event . Key . Group ,
resource : event . Key . Resource ,
name : event . Key . Name ,
} )
2025-05-21 15:49:49 +08:00
return event . GUID , nil
2024-07-18 23:03:18 +08:00
} )
2025-02-21 19:25:35 +08:00
if err != nil {
return 0 , err
}
b . notifier . send ( ctx , & resource . WrittenEvent {
Type : event . Type ,
Key : event . Key ,
PreviousRV : event . PreviousRV ,
Value : event . Value ,
2025-03-13 16:24:12 +08:00
ResourceVersion : rv ,
2025-02-21 19:25:35 +08:00
Folder : folder ,
} )
2025-03-13 16:24:12 +08:00
return rv , nil
2024-07-18 23:03:18 +08:00
}
2025-05-16 03:36:52 +08:00
func ( b * backend ) ReadResource ( ctx context . Context , req * resourcepb . ReadRequest ) * resource . BackendReadResponse {
2024-10-10 04:32:09 +08:00
_ , span := b . tracer . Start ( ctx , tracePrefix + ".Read" )
2024-07-18 23:03:18 +08:00
defer span . End ( )
// TODO: validate key ?
2025-03-28 01:34:37 +08:00
if req . ResourceVersion > 0 {
return b . readHistory ( ctx , req . Key , req . ResourceVersion )
}
2024-08-20 20:29:06 +08:00
readReq := & sqlResourceReadRequest {
2024-11-12 19:52:04 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
Request : req ,
Response : NewReadResponse ( ) ,
2024-07-18 23:03:18 +08:00
}
2024-11-12 19:52:04 +08:00
var res * resource . BackendReadResponse
2024-08-20 20:29:06 +08:00
err := b . db . WithTx ( ctx , ReadCommittedRO , func ( ctx context . Context , tx db . Tx ) error {
var err error
2025-03-28 01:34:37 +08:00
res , err = dbutil . QueryRow ( ctx , tx , sqlResourceRead , readReq )
2024-08-20 20:29:06 +08:00
return err
} )
2024-12-14 06:55:43 +08:00
2024-07-18 23:03:18 +08:00
if errors . Is ( err , sql . ErrNoRows ) {
2024-11-12 19:52:04 +08:00
return & resource . BackendReadResponse {
2024-07-30 18:16:16 +08:00
Error : resource . NewNotFoundError ( req . Key ) ,
}
2024-07-18 23:03:18 +08:00
} else if err != nil {
2024-11-12 19:52:04 +08:00
return & resource . BackendReadResponse { Error : resource . AsErrorResult ( err ) }
2024-07-18 23:03:18 +08:00
}
2024-11-12 19:52:04 +08:00
return res
2024-07-18 23:03:18 +08:00
}
2025-05-16 03:36:52 +08:00
func ( b * backend ) ListIterator ( ctx context . Context , req * resourcepb . ListRequest , cb func ( resource . ListIterator ) error ) ( int64 , error ) {
2025-01-17 20:54:25 +08:00
ctx , span := b . tracer . Start ( ctx , tracePrefix + "List" )
2024-07-18 23:03:18 +08:00
defer span . End ( )
2025-03-25 22:28:24 +08:00
if err := resource . MigrateListRequestVersionMatch ( req , b . log ) ; err != nil {
return 0 , err
}
2024-07-22 21:07:12 +08:00
if req . Options == nil || req . Options . Key . Group == "" || req . Options . Key . Resource == "" {
2024-07-31 17:05:59 +08:00
return 0 , fmt . Errorf ( "missing group or resource" )
2024-07-22 21:07:12 +08:00
}
2024-07-18 23:03:18 +08:00
// TODO: think about how to handler VersionMatch. We should be able to use latest for the first page (only).
2024-07-23 01:08:30 +08:00
// TODO: add support for RemainingItemCount
2024-07-18 23:03:18 +08:00
if req . ResourceVersion > 0 || req . NextPageToken != "" {
2024-07-31 17:05:59 +08:00
return b . listAtRevision ( ctx , req , cb )
2024-07-18 23:03:18 +08:00
}
2024-07-31 17:05:59 +08:00
return b . listLatest ( ctx , req , cb )
}
2025-05-23 21:00:18 +08:00
func ( b * backend ) ListHistory ( ctx context . Context , req * resourcepb . ListRequest , cb func ( resource . ListIterator ) error ) ( int64 , error ) {
ctx , span := b . tracer . Start ( ctx , tracePrefix + "ListHistory" )
defer span . End ( )
return b . getHistory ( ctx , req , cb )
}
2024-07-18 23:03:18 +08:00
// listLatest fetches the resources from the resource table.
2025-05-16 03:36:52 +08:00
func ( b * backend ) listLatest ( ctx context . Context , req * resourcepb . ListRequest , cb func ( resource . ListIterator ) error ) ( int64 , error ) {
2025-03-26 19:50:29 +08:00
ctx , span := b . tracer . Start ( ctx , tracePrefix + "listLatest" )
defer span . End ( )
2024-07-31 17:05:59 +08:00
if req . NextPageToken != "" {
return 0 , fmt . Errorf ( "only works for the first page" )
}
if req . ResourceVersion > 0 {
return 0 , fmt . Errorf ( "only works for the 'latest' resource version" )
2024-07-18 23:03:18 +08:00
}
2025-08-12 23:29:50 +08:00
iter := & listIter { sortAsc : false }
2024-07-23 01:08:30 +08:00
err := b . db . WithTx ( ctx , ReadCommittedRO , func ( ctx context . Context , tx db . Tx ) error {
2024-07-18 23:03:18 +08:00
var err error
2025-03-26 19:50:29 +08:00
iter . listRV , err = b . fetchLatestRV ( ctx , tx , b . dialect , req . Options . Key . Group , req . Options . Key . Resource )
2024-07-18 23:03:18 +08:00
if err != nil {
return err
}
listReq := sqlResourceListRequest {
2024-08-16 20:12:37 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
2025-05-16 03:36:52 +08:00
Request : new ( resourcepb . ListRequest ) ,
2024-07-18 23:03:18 +08:00
}
2025-05-16 03:36:52 +08:00
listReq . Request = proto . Clone ( req ) . ( * resourcepb . ListRequest )
2024-07-18 23:03:18 +08:00
2024-07-31 17:05:59 +08:00
rows , err := dbutil . QueryRows ( ctx , tx , sqlResourceList , listReq )
if rows != nil {
defer func ( ) {
if err := rows . Close ( ) ; err != nil {
b . log . Warn ( "listLatest error closing rows" , "error" , err )
}
} ( )
2024-07-18 23:03:18 +08:00
}
2024-07-31 17:05:59 +08:00
if err != nil {
return err
2024-07-18 23:03:18 +08:00
}
2024-07-31 17:05:59 +08:00
iter . rows = rows
return cb ( iter )
2024-07-18 23:03:18 +08:00
} )
2024-07-31 17:05:59 +08:00
return iter . listRV , err
2024-07-18 23:03:18 +08:00
}
2025-08-21 01:54:31 +08:00
// ListModifiedSince will return all resources that have changed since the given resource version.
// If a resource has changes, only the latest change will be returned.
func ( b * backend ) ListModifiedSince ( ctx context . Context , key resource . NamespacedResource , sinceRv int64 ) ( int64 , iter . Seq2 [ * resource . ModifiedResource , error ] ) {
tx , err := b . db . BeginTx ( ctx , RepeatableRead )
if err != nil {
return 0 , func ( yield func ( * resource . ModifiedResource , error ) bool ) {
yield ( nil , err )
}
}
2025-08-29 20:49:20 +08:00
rollbackOnDefer := true
defer func ( ) {
if rollbackOnDefer {
if terr := tx . Rollback ( ) ; terr != nil {
b . log . Warn ( "Error rolling back transaction in ListModifiedSince" , "error" , terr )
}
}
} ( )
2025-08-21 01:54:31 +08:00
// Fetch latest RV within the transaction
latestRv , err := b . fetchLatestRV ( ctx , tx , b . dialect , key . Group , key . Resource )
if err != nil {
return 0 , func ( yield func ( * resource . ModifiedResource , error ) bool ) {
yield ( nil , err )
}
}
2025-08-29 20:49:20 +08:00
// If latest RV is the same as request RV, there's nothing to report, and we can avoid running another query.
if latestRv == sinceRv {
return 0 , func ( yield func ( * resource . ModifiedResource , error ) bool ) { /* nothing to return */ }
}
2025-08-21 01:54:31 +08:00
// since results are sorted by name ASC and rv DESC, we can get away with tracking the last seen
lastSeen := ""
2025-08-29 20:49:20 +08:00
// We will rollback after iteration has finished.
rollbackOnDefer = false
2025-08-21 01:54:31 +08:00
// rollback transaction if iterator not called within 30 seconds
rollbackTimer := time . AfterFunc ( 30 * time . Second , func ( ) {
if err := tx . Rollback ( ) ; err != nil && ! errors . Is ( err , sql . ErrTxDone ) {
b . log . Warn ( "rollback timer error" , "err" , err )
}
} )
seq := func ( yield func ( * resource . ModifiedResource , error ) bool ) {
rollbackTimer . Stop ( )
defer func ( ) {
// Always rollback the read-only transaction when iterator is done
if rollbackErr := tx . Rollback ( ) ; rollbackErr != nil {
b . log . Warn ( "Error rolling back transaction in ListModifiedSince" , "error" , rollbackErr )
}
} ( )
query := sqlResourceListModifiedSinceRequest {
SQLTemplate : sqltemplate . New ( b . dialect ) ,
Namespace : key . Namespace ,
Group : key . Group ,
Resource : key . Resource ,
SinceRv : sinceRv ,
}
rows , err := dbutil . QueryRows ( ctx , tx , sqlResourceHistoryListModifiedSince , query )
if err != nil {
yield ( nil , err )
return
}
if rows != nil {
defer func ( ) {
if cerr := rows . Close ( ) ; cerr != nil {
b . log . Warn ( "listSinceModified error closing rows" , "error" , cerr )
}
} ( )
}
for rows . Next ( ) {
mr := & resource . ModifiedResource { }
if err := rows . Scan ( & mr . Key . Namespace , & mr . Key . Group , & mr . Key . Resource , & mr . Key . Name , & mr . ResourceVersion , & mr . Action , & mr . Value ) ; err != nil {
if ! yield ( nil , err ) {
return
}
continue
}
// Deduplicate by name (namespace, group, and resource are always the same in the result set)
if mr . Key . Name == lastSeen {
continue
}
lastSeen = mr . Key . Name
if ! yield ( mr , nil ) {
return
}
}
}
return latestRv , seq
}
2024-07-18 23:03:18 +08:00
// listAtRevision fetches the resources from the resource_history table at a specific revision.
2025-05-16 03:36:52 +08:00
func ( b * backend ) listAtRevision ( ctx context . Context , req * resourcepb . ListRequest , cb func ( resource . ListIterator ) error ) ( int64 , error ) {
2025-03-22 04:20:27 +08:00
ctx , span := b . tracer . Start ( ctx , tracePrefix + "listAtRevision" )
defer span . End ( )
2024-07-18 23:03:18 +08:00
// Get the RV
2025-08-12 23:29:50 +08:00
iter := & listIter { listRV : req . ResourceVersion , sortAsc : false }
2024-07-18 23:03:18 +08:00
if req . NextPageToken != "" {
2025-02-25 00:02:30 +08:00
continueToken , err := resource . GetContinueToken ( req . NextPageToken )
2024-07-18 23:03:18 +08:00
if err != nil {
2025-06-10 22:41:39 +08:00
return 0 , fmt . Errorf ( "get continue token (%q): %w" , req . NextPageToken , err )
2024-07-18 23:03:18 +08:00
}
2024-07-31 17:05:59 +08:00
iter . listRV = continueToken . ResourceVersion
iter . offset = continueToken . StartOffset
2024-07-18 23:03:18 +08:00
2024-07-31 17:05:59 +08:00
if req . ResourceVersion != 0 && req . ResourceVersion != iter . listRV {
return 0 , apierrors . NewBadRequest ( "request resource version does not math token" )
}
}
if iter . listRV < 1 {
return 0 , apierrors . NewBadRequest ( "expecting an explicit resource version query" )
2024-07-18 23:03:18 +08:00
}
2025-03-22 04:20:27 +08:00
// The query below has the potential to be EXTREMELY slow if the resource_history table is big. May be helpful to know
// which stack is calling this.
b . log . Debug ( "listAtRevision" , "ns" , req . Options . Key . Namespace , "group" , req . Options . Key . Group , "resource" , req . Options . Key . Resource , "rv" , iter . listRV )
2024-07-23 01:08:30 +08:00
err := b . db . WithTx ( ctx , ReadCommittedRO , func ( ctx context . Context , tx db . Tx ) error {
2024-07-31 17:05:59 +08:00
limit := int64 ( 0 ) // ignore limit
if iter . offset > 0 {
limit = math . MaxInt64 // a limit is required for offset
}
2024-07-18 23:03:18 +08:00
listReq := sqlResourceHistoryListRequest {
2024-08-16 20:12:37 +08:00
SQLTemplate : sqltemplate . New ( b . dialect ) ,
2024-07-18 23:03:18 +08:00
Request : & historyListRequest {
2024-07-31 17:05:59 +08:00
ResourceVersion : iter . listRV ,
Limit : limit ,
Offset : iter . offset ,
2024-07-18 23:03:18 +08:00
Options : req . Options ,
} ,
}
2024-07-23 01:08:30 +08:00
2024-07-31 17:05:59 +08:00
rows , err := dbutil . QueryRows ( ctx , tx , sqlResourceHistoryList , listReq )
if rows != nil {
defer func ( ) {
if err := rows . Close ( ) ; err != nil {
b . log . Warn ( "listAtRevision error closing rows" , "error" , err )
}
} ( )
2024-07-18 23:03:18 +08:00
}
2024-07-31 17:05:59 +08:00
if err != nil {
return err
2024-07-18 23:03:18 +08:00
}
2024-07-31 17:05:59 +08:00
iter . rows = rows
return cb ( iter )
2024-07-18 23:03:18 +08:00
} )
2024-07-31 17:05:59 +08:00
return iter . listRV , err
2024-07-18 23:03:18 +08:00
}
2025-03-28 01:34:37 +08:00
// readHistory fetches the resource history from the resource_history table.
2025-05-16 03:36:52 +08:00
func ( b * backend ) readHistory ( ctx context . Context , key * resourcepb . ResourceKey , rv int64 ) * resource . BackendReadResponse {
2025-03-28 01:34:37 +08:00
_ , span := b . tracer . Start ( ctx , tracePrefix + ".ReadHistory" )
defer span . End ( )
readReq := & sqlResourceHistoryReadRequest {
SQLTemplate : sqltemplate . New ( b . dialect ) ,
Request : & historyReadRequest {
Key : key ,
ResourceVersion : rv ,
} ,
Response : NewReadResponse ( ) ,
}
var res * resource . BackendReadResponse
err := b . db . WithTx ( ctx , ReadCommittedRO , func ( ctx context . Context , tx db . Tx ) error {
var err error
res , err = dbutil . QueryRow ( ctx , tx , sqlResourceHistoryRead , readReq )
return err
} )
if errors . Is ( err , sql . ErrNoRows ) {
return & resource . BackendReadResponse { Error : resource . NewNotFoundError ( key ) }
}
if err != nil {
return & resource . BackendReadResponse { Error : resource . AsErrorResult ( err ) }
}
return res
}
// getHistory fetches the resource history from the resource_history table.
2025-05-16 03:36:52 +08:00
func ( b * backend ) getHistory ( ctx context . Context , req * resourcepb . ListRequest , cb func ( resource . ListIterator ) error ) ( int64 , error ) {
2025-03-26 19:50:29 +08:00
ctx , span := b . tracer . Start ( ctx , tracePrefix + "getHistory" )
defer span . End ( )
2025-01-17 20:54:25 +08:00
listReq := sqlGetHistoryRequest {
SQLTemplate : sqltemplate . New ( b . dialect ) ,
Key : req . Options . Key ,
2025-05-16 03:36:52 +08:00
Trash : req . Source == resourcepb . ListRequest_TRASH ,
2025-01-17 20:54:25 +08:00
}
2025-03-22 00:35:32 +08:00
// We are assuming that users want history in ascending order
// when they are using NotOlderThan matching, and descending order
// for Unset (default) and Exact matching.
2025-05-16 03:36:52 +08:00
listReq . SortAscending = req . GetVersionMatchV2 ( ) == resourcepb . ResourceVersionMatchV2_NotOlderThan
2025-03-22 00:35:32 +08:00
2025-05-23 21:00:18 +08:00
iter := & listIter {
useCurrentRV : true , // use the current RV for the continue token instead of the listRV
}
2025-01-17 20:54:25 +08:00
if req . NextPageToken != "" {
2025-02-25 00:02:30 +08:00
continueToken , err := resource . GetContinueToken ( req . NextPageToken )
2025-01-17 20:54:25 +08:00
if err != nil {
2025-06-10 22:41:39 +08:00
return 0 , fmt . Errorf ( "get continue token (%q): %w" , req . NextPageToken , err )
2025-01-17 20:54:25 +08:00
}
listReq . StartRV = continueToken . ResourceVersion
2025-03-22 00:35:32 +08:00
listReq . SortAscending = continueToken . SortAscending
2025-01-17 20:54:25 +08:00
}
2025-03-22 00:35:32 +08:00
iter . sortAsc = listReq . SortAscending
2025-01-17 20:54:25 +08:00
2025-03-19 23:16:48 +08:00
// Set ExactRV when using Exact matching
2025-05-16 03:36:52 +08:00
if req . VersionMatchV2 == resourcepb . ResourceVersionMatchV2_Exact {
2025-03-19 23:16:48 +08:00
if req . ResourceVersion <= 0 {
return 0 , fmt . Errorf ( "expecting an explicit resource version query when using Exact matching" )
}
listReq . ExactRV = req . ResourceVersion
}
// Set MinRV when using NotOlderThan matching to filter at the database level
2025-05-16 03:36:52 +08:00
if req . ResourceVersion > 0 && req . VersionMatchV2 == resourcepb . ResourceVersionMatchV2_NotOlderThan {
2025-03-19 23:16:48 +08:00
listReq . MinRV = req . ResourceVersion
}
2025-03-28 01:34:37 +08:00
// Ignore last deleted history record when listing the trash, using exact matching or not older than matching with a specific RV
2025-05-16 03:36:52 +08:00
useLatestDeletionAsMinRV := listReq . MinRV == 0 && ! listReq . Trash && req . VersionMatchV2 != resourcepb . ResourceVersionMatchV2_Exact
2025-03-28 01:34:37 +08:00
2025-01-17 20:54:25 +08:00
err := b . db . WithTx ( ctx , ReadCommittedRO , func ( ctx context . Context , tx db . Tx ) error {
var err error
2025-03-26 19:50:29 +08:00
iter . listRV , err = b . fetchLatestRV ( ctx , tx , b . dialect , req . Options . Key . Group , req . Options . Key . Resource )
2025-01-17 20:54:25 +08:00
if err != nil {
return err
}
2025-03-28 01:34:37 +08:00
if useLatestDeletionAsMinRV {
2025-05-16 03:36:52 +08:00
latestDeletedRV , err := b . fetchLatestHistoryRV ( ctx , tx , b . dialect , req . Options . Key , resourcepb . WatchEvent_DELETED )
2025-03-28 01:34:37 +08:00
if err != nil {
return err
}
listReq . MinRV = latestDeletedRV + 1
}
2025-06-12 02:54:38 +08:00
var rows db . Rows
if listReq . Trash {
// unlike history, trash will not return an object if an object of the same name is live
// (i.e. in the resource table)
rows , err = dbutil . QueryRows ( ctx , tx , sqlResourceTrash , listReq )
} else {
rows , err = dbutil . QueryRows ( ctx , tx , sqlResourceHistoryGet , listReq )
}
2025-01-17 20:54:25 +08:00
if rows != nil {
defer func ( ) {
if err := rows . Close ( ) ; err != nil {
b . log . Warn ( "listLatest error closing rows" , "error" , err )
}
} ( )
}
if err != nil {
return err
}
iter . rows = rows
return cb ( iter )
} )
return iter . listRV , err
}
2024-07-18 23:03:18 +08:00
func ( b * backend ) WatchWriteEvents ( ctx context . Context ) ( <- chan * resource . WrittenEvent , error ) {
2025-02-21 19:25:35 +08:00
return b . notifier . notify ( ctx )
2024-07-18 23:03:18 +08:00
}
2024-07-22 21:07:12 +08:00
// listLatestRVs returns the latest resource version for each (Group, Resource) pair.
func ( b * backend ) listLatestRVs ( ctx context . Context ) ( groupResourceRV , error ) {
2024-10-25 03:49:38 +08:00
ctx , span := b . tracer . Start ( ctx , tracePrefix + "listLatestRVs" )
defer span . End ( )
2024-08-20 20:29:06 +08:00
var grvs [ ] * groupResourceVersion
err := b . db . WithTx ( ctx , ReadCommittedRO , func ( ctx context . Context , tx db . Tx ) error {
var err error
grvs , err = dbutil . Query ( ctx , tx , sqlResourceVersionList , & sqlResourceVersionListRequest {
SQLTemplate : sqltemplate . New ( b . dialect ) ,
groupResourceVersion : new ( groupResourceVersion ) ,
} )
return err
} )
2024-07-18 23:03:18 +08:00
if err != nil {
2024-08-20 20:29:06 +08:00
return nil , err
2024-07-18 23:03:18 +08:00
}
2024-07-22 21:07:12 +08:00
2024-08-20 20:29:06 +08:00
since := groupResourceRV { }
for _ , grv := range grvs {
if since [ grv . Group ] == nil {
since [ grv . Group ] = map [ string ] int64 { }
2024-07-22 21:07:12 +08:00
}
2024-08-20 20:29:06 +08:00
since [ grv . Group ] [ grv . Resource ] = grv . ResourceVersion
2024-07-18 23:03:18 +08:00
}
2024-08-20 20:29:06 +08:00
2024-07-22 21:07:12 +08:00
return since , nil
2024-07-18 23:03:18 +08:00
}
2024-07-22 21:07:12 +08:00
// fetchLatestRV returns the current maximum RV in the resource table
2025-03-26 19:50:29 +08:00
func ( b * backend ) fetchLatestRV ( ctx context . Context , x db . ContextExecer , d sqltemplate . Dialect , group , resource string ) ( int64 , error ) {
ctx , span := b . tracer . Start ( ctx , tracePrefix + "fetchLatestRV" )
defer span . End ( )
2024-10-11 17:11:33 +08:00
res , err := dbutil . QueryRow ( ctx , x , sqlResourceVersionGet , sqlResourceVersionGetRequest {
SQLTemplate : sqltemplate . New ( d ) ,
Group : group ,
Resource : resource ,
ReadOnly : true ,
Response : new ( resourceVersionResponse ) ,
2024-07-22 21:07:12 +08:00
} )
if errors . Is ( err , sql . ErrNoRows ) {
2024-07-26 00:17:39 +08:00
return 1 , nil
2024-07-22 21:07:12 +08:00
} else if err != nil {
return 0 , fmt . Errorf ( "get resource version: %w" , err )
}
return res . ResourceVersion , nil
}
2025-03-28 01:34:37 +08:00
// fetchLatestHistoryRV returns the current maximum RV in the resource_history table
2025-05-16 03:36:52 +08:00
func ( b * backend ) fetchLatestHistoryRV ( ctx context . Context , x db . ContextExecer , d sqltemplate . Dialect , key * resourcepb . ResourceKey , eventType resourcepb . WatchEvent_Type ) ( int64 , error ) {
2025-03-28 01:34:37 +08:00
ctx , span := b . tracer . Start ( ctx , tracePrefix + "fetchLatestHistoryRV" )
defer span . End ( )
res , err := dbutil . QueryRow ( ctx , x , sqlResourceHistoryReadLatestRV , sqlResourceHistoryReadLatestRVRequest {
SQLTemplate : sqltemplate . New ( d ) ,
Request : & historyReadLatestRVRequest {
Key : key ,
EventType : eventType ,
} ,
Response : new ( resourceHistoryReadLatestRVResponse ) ,
} )
if errors . Is ( err , sql . ErrNoRows ) {
return 0 , nil
} else if err != nil {
return 0 , fmt . Errorf ( "get resource version: %w" , err )
}
return res . ResourceVersion , nil
}