fix: enforce timeout for requests run in qos (#110162)

This commit is contained in:
Mustafa Sencer Özcan 2025-08-26 18:40:22 +02:00 committed by GitHub
parent 5e1f79ca83
commit 3056924d10
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 173 additions and 46 deletions

View File

@ -28,15 +28,6 @@ import (
"github.com/grafana/grafana/pkg/util/scheduler"
)
const (
// DefaultMaxBackoff is the default maximum backoff duration for enqueue operations.
DefaultMaxBackoff = 1 * time.Second
// DefaultMinBackoff is the default minimum backoff duration for enqueue operations.
DefaultMinBackoff = 100 * time.Millisecond
// DefaultMaxRetries is the default maximum number of retries for enqueue operations.
DefaultMaxRetries = 3
)
// ResourceServer implements all gRPC services
type ResourceServer interface {
resourcepb.ResourceStoreServer
@ -162,6 +153,13 @@ type QOSEnqueuer interface {
Enqueue(ctx context.Context, tenantID string, runnable func()) error
}
type QueueConfig struct {
MaxBackoff time.Duration
MinBackoff time.Duration
MaxRetries int
Timeout time.Duration
}
type BlobConfig struct {
// The CDK configuration URL
URL string
@ -240,7 +238,8 @@ type ResourceServerOptions struct {
MaxPageSizeBytes int
// QOSQueue is the quality of service queue used to enqueue
QOSQueue QOSEnqueuer
QOSQueue QOSEnqueuer
QOSConfig QueueConfig
Ring *ring.Ring
RingLifecycler *ring.BasicLifecycler
@ -281,6 +280,19 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) {
opts.QOSQueue = scheduler.NewNoopQueue()
}
if opts.QOSConfig.Timeout == 0 {
opts.QOSConfig.Timeout = 30 * time.Second
}
if opts.QOSConfig.MaxBackoff == 0 {
opts.QOSConfig.MaxBackoff = 1 * time.Second
}
if opts.QOSConfig.MinBackoff == 0 {
opts.QOSConfig.MinBackoff = 100 * time.Millisecond
}
if opts.QOSConfig.MaxRetries == 0 {
opts.QOSConfig.MaxRetries = 3
}
// Initialize the blob storage
blobstore := opts.Blob.Backend
if blobstore == nil {
@ -326,6 +338,7 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) {
maxPageSizeBytes: opts.MaxPageSizeBytes,
reg: opts.Reg,
queue: opts.QOSQueue,
queueConfig: opts.QOSConfig,
}
if opts.Search.Resources != nil {
@ -375,6 +388,7 @@ type server struct {
maxPageSizeBytes int
reg prometheus.Registerer
queue QOSEnqueuer
queueConfig QueueConfig
}
// Init implements ResourceServer.
@ -635,8 +649,8 @@ func (s *server) Create(ctx context.Context, req *resourcepb.CreateRequest) (*re
res *resourcepb.CreateResponse
err error
)
runErr := s.runInQueue(ctx, req.Key.Namespace, func() {
res, err = s.create(ctx, user, req)
runErr := s.runInQueue(ctx, req.Key.Namespace, func(queueCtx context.Context) {
res, err = s.create(queueCtx, user, req)
})
if runErr != nil {
return HandleQueueError(runErr, func(e *resourcepb.ErrorResult) *resourcepb.CreateResponse {
@ -689,8 +703,8 @@ func (s *server) Update(ctx context.Context, req *resourcepb.UpdateRequest) (*re
res *resourcepb.UpdateResponse
err error
)
runErr := s.runInQueue(ctx, req.Key.Namespace, func() {
res, err = s.update(ctx, user, req)
runErr := s.runInQueue(ctx, req.Key.Namespace, func(queueCtx context.Context) {
res, err = s.update(queueCtx, user, req)
})
if runErr != nil {
return HandleQueueError(runErr, func(e *resourcepb.ErrorResult) *resourcepb.UpdateResponse {
@ -757,8 +771,8 @@ func (s *server) Delete(ctx context.Context, req *resourcepb.DeleteRequest) (*re
err error
)
runErr := s.runInQueue(ctx, req.Key.Namespace, func() {
res, err = s.delete(ctx, user, req)
runErr := s.runInQueue(ctx, req.Key.Namespace, func(queueCtx context.Context) {
res, err = s.delete(queueCtx, user, req)
})
if runErr != nil {
return HandleQueueError(runErr, func(e *resourcepb.ErrorResult) *resourcepb.DeleteResponse {
@ -868,8 +882,8 @@ func (s *server) Read(ctx context.Context, req *resourcepb.ReadRequest) (*resour
res *resourcepb.ReadResponse
err error
)
runErr := s.runInQueue(ctx, req.Key.Namespace, func() {
res, err = s.read(ctx, user, req)
runErr := s.runInQueue(ctx, req.Key.Namespace, func(queueCtx context.Context) {
res, err = s.read(queueCtx, user, req)
})
if runErr != nil {
return HandleQueueError(runErr, func(e *resourcepb.ErrorResult) *resourcepb.ReadResponse {
@ -1375,40 +1389,44 @@ func (s *server) GetBlob(ctx context.Context, req *resourcepb.GetBlobRequest) (*
return rsp, nil
}
func (s *server) runInQueue(ctx context.Context, tenantID string, runnable func()) error {
boff := backoff.New(ctx, backoff.Config{
MinBackoff: DefaultMinBackoff,
MaxBackoff: DefaultMaxBackoff,
MaxRetries: DefaultMaxRetries,
func (s *server) runInQueue(ctx context.Context, tenantID string, runnable func(ctx context.Context)) error {
// Enforce a timeout for the entire operation, including queueing and execution.
queueCtx, cancel := context.WithTimeout(ctx, s.queueConfig.Timeout)
defer cancel()
done := make(chan struct{})
wrappedRunnable := func() {
defer close(done)
runnable(queueCtx)
}
// Retry enqueueing with backoff, respecting the timeout context.
boff := backoff.New(queueCtx, backoff.Config{
MinBackoff: s.queueConfig.MinBackoff,
MaxBackoff: s.queueConfig.MaxBackoff,
MaxRetries: s.queueConfig.MaxRetries,
})
var (
wg sync.WaitGroup
err error
)
wg.Add(1)
wrapped := func() {
defer wg.Done()
runnable()
}
for boff.Ongoing() {
err = s.queue.Enqueue(ctx, tenantID, wrapped)
for {
err := s.queue.Enqueue(queueCtx, tenantID, wrappedRunnable)
if err == nil {
// Successfully enqueued.
break
}
s.log.Warn("failed to enqueue runnable, retrying",
"maxRetries", DefaultMaxRetries,
"tenantID", tenantID,
"error", err)
s.log.Warn("failed to enqueue runnable, retrying", "tenantID", tenantID, "error", err)
if !boff.Ongoing() {
// Backoff finished (retries exhausted or context canceled).
return fmt.Errorf("failed to enqueue for tenant %s: %w", tenantID, err)
}
boff.Wait()
}
if err != nil {
s.log.Error("failed to enqueue runnable",
"maxRetries", DefaultMaxRetries,
"tenantID", tenantID,
"error", err)
return fmt.Errorf("failed to enqueue runnable for tenant %s: %w", tenantID, err)
// Wait for the runnable to complete or for the context to be done.
select {
case <-done:
return nil // Completed successfully.
case <-queueCtx.Done():
return queueCtx.Err() // Timed out or canceled while waiting for execution.
}
wg.Wait()
return nil
}

View File

@ -4,20 +4,27 @@ import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"os"
"sync"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gocloud.dev/blob/fileblob"
"gocloud.dev/blob/memblob"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
authlib "github.com/grafana/authlib/types"
"github.com/grafana/dskit/services"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/util/scheduler"
)
func TestSimpleServer(t *testing.T) {
@ -242,3 +249,105 @@ func TestSimpleServer(t *testing.T) {
require.ErrorIs(t, err, ErrOptimisticLockingFailed)
})
}
func TestRunInQueue(t *testing.T) {
const testTenantID = "test-tenant"
t.Run("should execute successfully when queue has capacity", func(t *testing.T) {
s, _ := newTestServerWithQueue(t, 1, 1)
executed := make(chan bool, 1)
runnable := func(ctx context.Context) {
executed <- true
}
err := s.runInQueue(context.Background(), testTenantID, runnable)
require.NoError(t, err)
assert.True(t, <-executed, "runnable should have been executed")
})
t.Run("should time out if a task is sitting in the queue beyond the timeout", func(t *testing.T) {
s, _ := newTestServerWithQueue(t, 1, 1)
executed := make(chan struct{}, 1)
runnable := func(ctx context.Context) {
time.Sleep(1 * time.Second)
executed <- struct{}{}
}
err := s.runInQueue(context.Background(), testTenantID, runnable)
require.Error(t, err)
assert.Equal(t, context.DeadlineExceeded, err)
<-executed
})
t.Run("should return an error if queue is consistently full after retrying", func(t *testing.T) {
s, q := newTestServerWithQueue(t, 1, 1)
// Task 1: This will be picked up by the worker and block it.
blocker := make(chan struct{})
defer close(blocker)
blockingRunnable := func() {
<-blocker
}
err := q.Enqueue(context.Background(), testTenantID, blockingRunnable)
require.NoError(t, err)
for q.Len() > 0 {
time.Sleep(100 * time.Millisecond)
}
err = q.Enqueue(context.Background(), testTenantID, blockingRunnable)
require.NoError(t, err)
// Task 2: This runnable should never execute because the queue is full.
mu := sync.Mutex{}
executed := false
runnable := func(ctx context.Context) {
mu.Lock()
defer mu.Unlock()
executed = true
}
err = s.runInQueue(context.Background(), testTenantID, runnable)
require.Error(t, err)
require.ErrorIs(t, err, scheduler.ErrTenantQueueFull)
require.False(t, executed, "runnable should not have been executed")
})
}
// newTestServerWithQueue creates a server with a real scheduler.Queue for testing.
// It also sets up a worker to consume items from the queue.
func newTestServerWithQueue(t *testing.T, maxSizePerTenant int, numWorkers int) (*server, *scheduler.Queue) {
t.Helper()
q := scheduler.NewQueue(&scheduler.QueueOptions{
MaxSizePerTenant: maxSizePerTenant,
Registerer: prometheus.NewRegistry(),
Logger: log.NewNopLogger(),
})
err := services.StartAndAwaitRunning(context.Background(), q)
require.NoError(t, err)
t.Cleanup(func() {
err := services.StopAndAwaitTerminated(context.Background(), q)
require.NoError(t, err)
})
// Create a worker to consume from the queue
worker, err := scheduler.NewScheduler(q, &scheduler.Config{
Logger: log.NewNopLogger(),
NumWorkers: numWorkers,
})
require.NoError(t, err)
err = services.StartAndAwaitRunning(context.Background(), worker)
require.NoError(t, err)
t.Cleanup(func() {
err := services.StopAndAwaitTerminated(context.Background(), worker)
require.NoError(t, err)
})
s := &server{
queue: q,
queueConfig: QueueConfig{
Timeout: 500 * time.Millisecond,
MaxRetries: 2,
MinBackoff: 10 * time.Millisecond,
},
log: slog.Default(),
}
return s, q
}