Retry aborted transactions on Spanner. (#103289)

* Retry aborted transactions on Spanner.
This commit is contained in:
Peter Štibraný 2025-04-03 16:26:09 +02:00 committed by GitHub
parent f5beba1036
commit 413378dd3a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 69 additions and 39 deletions

View File

@ -2,15 +2,14 @@ package sqlstore
import (
"context"
"errors"
"fmt"
"reflect"
"time"
"github.com/mattn/go-sqlite3"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"xorm.io/core"
"xorm.io/xorm"
@ -76,12 +75,12 @@ func startSessionOrUseExisting(ctx context.Context, engine *xorm.Engine, beginTr
// WithDbSession calls the callback with the session in the context (if exists).
// Otherwise it creates a new one that is closed upon completion.
// A session is stored in the context if sqlstore.InTransaction() has been previously called with the same context (and it's not committed/rolledback yet).
// In case of sqlite3.ErrLocked or sqlite3.ErrBusy failure it will be retried at most five times before giving up.
// In case of retryable errors, callback will be retried at most five times before giving up.
func (ss *SQLStore) WithDbSession(ctx context.Context, callback DBTransactionFunc) error {
return ss.withDbSession(ctx, ss.engine, callback)
}
func (ss *SQLStore) retryOnLocks(ctx context.Context, callback DBTransactionFunc, sess *DBSession, retry int) func() (retryer.RetrySignal, error) {
func (ss *SQLStore) retryOnLocks(ctx context.Context, callback DBTransactionFunc, sess *DBSession, retry int, dialect core.Dialect) func() (retryer.RetrySignal, error) {
return func() (retryer.RetrySignal, error) {
retry++
@ -89,9 +88,8 @@ func (ss *SQLStore) retryOnLocks(ctx context.Context, callback DBTransactionFunc
ctxLogger := tsclogger.FromContext(ctx)
var sqlError sqlite3.Error
if errors.As(err, &sqlError) && (sqlError.Code == sqlite3.ErrLocked || sqlError.Code == sqlite3.ErrBusy) {
ctxLogger.Info("Database locked, sleeping then retrying", "error", err, "retry", retry, "code", sqlError.Code)
if r, ok := dialect.(xorm.DialectWithRetryableErrors); ok && r.RetryOnError(err) {
ctxLogger.Info("Database locked, sleeping then retrying", "error", err, "retry", retry, "code")
// retryer immediately returns the error (if there is one) without checking the response
// therefore we only have to send it if we have reached the maximum retries
if retry >= ss.dbCfg.QueryRetries {
@ -120,7 +118,7 @@ func (ss *SQLStore) withDbSession(ctx context.Context, engine *xorm.Engine, call
defer sess.Close()
}
retry := 0
return retryer.Retry(ss.retryOnLocks(ctx, callback, sess, retry), ss.dbCfg.QueryRetries, time.Millisecond*time.Duration(10), time.Second)
return retryer.Retry(ss.retryOnLocks(ctx, callback, sess, retry, engine.Dialect()), ss.dbCfg.QueryRetries, time.Millisecond*time.Duration(10), time.Second)
}
func (sess *DBSession) InsertId(bean any, dialect migrator.Dialect) error {

View File

@ -7,12 +7,17 @@ import (
"testing"
"github.com/mattn/go-sqlite3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
)
func TestRetryingDisabled(t *testing.T) {
func TestIntegration_RetryingDisabled(t *testing.T) {
store, _ := InitTestDB(t)
retryErrors := getRetryErrors(t, store)
require.Equal(t, 0, store.dbCfg.QueryRetries)
funcToTest := map[string]func(ctx context.Context, callback DBTransactionFunc) error{
@ -31,20 +36,16 @@ func TestRetryingDisabled(t *testing.T) {
require.Equal(t, 1, i)
})
errCodes := []sqlite3.ErrNo{sqlite3.ErrBusy, sqlite3.ErrLocked}
for _, c := range errCodes {
t.Run(fmt.Sprintf("%s should return the sqlite3.Error %v immediately", name, c.Error()), func(t *testing.T) {
for _, e := range retryErrors {
t.Run(fmt.Sprintf("%s should return the sqlite3.Error %v immediately", name, e), func(t *testing.T) {
i := 0
callback := func(sess *DBSession) error {
i++
return sqlite3.Error{Code: c}
return e
}
err := f(context.Background(), callback)
require.Error(t, err)
var driverErr sqlite3.Error
require.ErrorAs(t, err, &driverErr)
require.Equal(t, 1, i)
assert.Equal(t, c, driverErr.Code)
})
}
@ -61,8 +62,9 @@ func TestRetryingDisabled(t *testing.T) {
}
}
func TestRetryingOnFailures(t *testing.T) {
func TestIntegration_RetryingOnFailures(t *testing.T) {
store, _ := InitTestDB(t)
retryErrors := getRetryErrors(t, store)
store.dbCfg.QueryRetries = 5
funcToTest := map[string]func(ctx context.Context, callback DBTransactionFunc) error{
@ -81,20 +83,16 @@ func TestRetryingOnFailures(t *testing.T) {
require.Equal(t, 1, i)
})
errCodes := []sqlite3.ErrNo{sqlite3.ErrBusy, sqlite3.ErrLocked}
for _, c := range errCodes {
t.Run(fmt.Sprintf("%s should return the sqlite3.Error %v if all retries have failed", name, c.Error()), func(t *testing.T) {
for _, e := range retryErrors {
t.Run(fmt.Sprintf("%s should return the error %v if all retries have failed", name, e), func(t *testing.T) {
i := 0
callback := func(sess *DBSession) error {
i++
return sqlite3.Error{Code: c}
return e
}
err := f(context.Background(), callback)
require.Error(t, err)
var driverErr sqlite3.Error
require.ErrorAs(t, err, &driverErr)
require.Equal(t, store.dbCfg.QueryRetries, i)
assert.Equal(t, c, driverErr.Code)
})
}
@ -107,7 +105,7 @@ func TestRetryingOnFailures(t *testing.T) {
case store.dbCfg.QueryRetries == i:
err = nil
default:
err = sqlite3.Error{Code: sqlite3.ErrBusy}
err = retryErrors[0]
}
return err
}
@ -137,3 +135,18 @@ func TestRetryingOnFailures(t *testing.T) {
require.Equal(t, int64(4), val3)
require.False(t, rows.Next()) // no more rows
}
func getRetryErrors(t *testing.T, store *SQLStore) []error {
var retryErrors []error
switch store.GetDialect().DriverName() {
case migrator.SQLite:
retryErrors = []error{sqlite3.Error{Code: sqlite3.ErrBusy}, sqlite3.Error{Code: sqlite3.ErrLocked}}
case migrator.Spanner:
retryErrors = []error{grpcstatus.Error(codes.Aborted, "aborted transaction")}
}
if len(retryErrors) == 0 {
t.Skip("This test only works with sqlite or spanner")
}
return retryErrors
}

View File

@ -2,11 +2,9 @@ package sqlstore
import (
"context"
"errors"
"fmt"
"time"
"github.com/mattn/go-sqlite3"
"xorm.io/xorm"
"github.com/grafana/grafana/pkg/bus"
@ -63,16 +61,17 @@ func (ss *SQLStore) inTransactionWithRetryCtx(ctx context.Context, engine *xorm.
return err
}
// special handling of database locked errors for sqlite, then we can retry 5 times
var sqlError sqlite3.Error
if errors.As(err, &sqlError) && retry < ss.dbCfg.TransactionRetries && (sqlError.Code == sqlite3.ErrLocked || sqlError.Code == sqlite3.ErrBusy) {
if rollErr := sess.Rollback(); rollErr != nil {
return fmt.Errorf("rolling back transaction due to error failed: %s: %w", rollErr, err)
}
// special handling of database locked errors for sqlite and spanner, then we can retry 5 times
if r, ok := engine.Dialect().(xorm.DialectWithRetryableErrors); ok {
if retry < ss.dbCfg.TransactionRetries && r.RetryOnError(err) {
if rollErr := sess.Rollback(); rollErr != nil {
return fmt.Errorf("rolling back transaction due to error failed: %s: %w", rollErr, err)
}
time.Sleep(time.Millisecond * time.Duration(10))
ctxLogger.Info("Database locked, sleeping then retrying", "error", err, "retry", retry, "code", sqlError.Code)
return ss.inTransactionWithRetryCtx(ctx, engine, bus, callback, retry+1)
time.Sleep(time.Millisecond * time.Duration(10))
ctxLogger.Info("Database locked, sleeping then retrying", "error", err, "retry", retry)
return ss.inTransactionWithRetryCtx(ctx, engine, bus, callback, retry+1)
}
}
if err != nil {

View File

@ -8,10 +8,12 @@ import (
"strconv"
"strings"
spannerclient "cloud.google.com/go/spanner"
_ "github.com/googleapis/go-sql-spanner"
spannerdriver "github.com/googleapis/go-sql-spanner"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"xorm.io/core"
)
@ -425,3 +427,7 @@ func SpannerConnectorConfigToClientOptions(connectorConfig spannerdriver.Connect
}
return opts
}
func (s *spanner) RetryOnError(err error) bool {
return err != nil && spannerclient.ErrCode(spannerclient.ToSpannerError(err)) == codes.Aborted
}

View File

@ -11,6 +11,7 @@ import (
"regexp"
"strings"
sqlite "github.com/mattn/go-sqlite3"
"xorm.io/core"
)
@ -474,6 +475,14 @@ func (db *sqlite3) Filters() []core.Filter {
return []core.Filter{&core.IdFilter{}}
}
func (db *sqlite3) RetryOnError(err error) bool {
var sqlError sqlite.Error
if errors.As(err, &sqlError) && (sqlError.Code == sqlite.ErrLocked || sqlError.Code == sqlite.ErrBusy) {
return true
}
return false
}
type sqlite3Driver struct {
}

View File

@ -117,7 +117,7 @@ func NewEngine(driverName string, dataSourceName string) (*Engine, error) {
runtime.SetFinalizer(engine, close)
if ext, ok := dialect.(DialectExt); ok {
if ext, ok := dialect.(DialectWithSequenceGenerator); ok {
engine.sequenceGenerator, err = ext.CreateSequenceGenerator(db.DB)
if err != nil {
return nil, fmt.Errorf("failed to create sequence generator: %w", err)
@ -138,9 +138,14 @@ type SequenceGenerator interface {
Reset()
}
type DialectExt interface {
type DialectWithSequenceGenerator interface {
core.Dialect
// CreateSequenceGenerator returns optional generator used to create AUTOINCREMENT ids for inserts.
CreateSequenceGenerator(db *sql.DB) (SequenceGenerator, error)
}
type DialectWithRetryableErrors interface {
core.Dialect
RetryOnError(err error) bool
}