diff --git a/pkg/storage/unified/resource/server.go b/pkg/storage/unified/resource/server.go index f5dc2b7f2a5..2152011d901 100644 --- a/pkg/storage/unified/resource/server.go +++ b/pkg/storage/unified/resource/server.go @@ -28,15 +28,6 @@ import ( "github.com/grafana/grafana/pkg/util/scheduler" ) -const ( - // DefaultMaxBackoff is the default maximum backoff duration for enqueue operations. - DefaultMaxBackoff = 1 * time.Second - // DefaultMinBackoff is the default minimum backoff duration for enqueue operations. - DefaultMinBackoff = 100 * time.Millisecond - // DefaultMaxRetries is the default maximum number of retries for enqueue operations. - DefaultMaxRetries = 3 -) - // ResourceServer implements all gRPC services type ResourceServer interface { resourcepb.ResourceStoreServer @@ -162,6 +153,13 @@ type QOSEnqueuer interface { Enqueue(ctx context.Context, tenantID string, runnable func()) error } +type QueueConfig struct { + MaxBackoff time.Duration + MinBackoff time.Duration + MaxRetries int + Timeout time.Duration +} + type BlobConfig struct { // The CDK configuration URL URL string @@ -240,7 +238,8 @@ type ResourceServerOptions struct { MaxPageSizeBytes int // QOSQueue is the quality of service queue used to enqueue - QOSQueue QOSEnqueuer + QOSQueue QOSEnqueuer + QOSConfig QueueConfig Ring *ring.Ring RingLifecycler *ring.BasicLifecycler @@ -281,6 +280,19 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) { opts.QOSQueue = scheduler.NewNoopQueue() } + if opts.QOSConfig.Timeout == 0 { + opts.QOSConfig.Timeout = 30 * time.Second + } + if opts.QOSConfig.MaxBackoff == 0 { + opts.QOSConfig.MaxBackoff = 1 * time.Second + } + if opts.QOSConfig.MinBackoff == 0 { + opts.QOSConfig.MinBackoff = 100 * time.Millisecond + } + if opts.QOSConfig.MaxRetries == 0 { + opts.QOSConfig.MaxRetries = 3 + } + // Initialize the blob storage blobstore := opts.Blob.Backend if blobstore == nil { @@ -326,6 +338,7 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) { maxPageSizeBytes: opts.MaxPageSizeBytes, reg: opts.Reg, queue: opts.QOSQueue, + queueConfig: opts.QOSConfig, } if opts.Search.Resources != nil { @@ -375,6 +388,7 @@ type server struct { maxPageSizeBytes int reg prometheus.Registerer queue QOSEnqueuer + queueConfig QueueConfig } // Init implements ResourceServer. @@ -635,8 +649,8 @@ func (s *server) Create(ctx context.Context, req *resourcepb.CreateRequest) (*re res *resourcepb.CreateResponse err error ) - runErr := s.runInQueue(ctx, req.Key.Namespace, func() { - res, err = s.create(ctx, user, req) + runErr := s.runInQueue(ctx, req.Key.Namespace, func(queueCtx context.Context) { + res, err = s.create(queueCtx, user, req) }) if runErr != nil { return HandleQueueError(runErr, func(e *resourcepb.ErrorResult) *resourcepb.CreateResponse { @@ -689,8 +703,8 @@ func (s *server) Update(ctx context.Context, req *resourcepb.UpdateRequest) (*re res *resourcepb.UpdateResponse err error ) - runErr := s.runInQueue(ctx, req.Key.Namespace, func() { - res, err = s.update(ctx, user, req) + runErr := s.runInQueue(ctx, req.Key.Namespace, func(queueCtx context.Context) { + res, err = s.update(queueCtx, user, req) }) if runErr != nil { return HandleQueueError(runErr, func(e *resourcepb.ErrorResult) *resourcepb.UpdateResponse { @@ -757,8 +771,8 @@ func (s *server) Delete(ctx context.Context, req *resourcepb.DeleteRequest) (*re err error ) - runErr := s.runInQueue(ctx, req.Key.Namespace, func() { - res, err = s.delete(ctx, user, req) + runErr := s.runInQueue(ctx, req.Key.Namespace, func(queueCtx context.Context) { + res, err = s.delete(queueCtx, user, req) }) if runErr != nil { return HandleQueueError(runErr, func(e *resourcepb.ErrorResult) *resourcepb.DeleteResponse { @@ -868,8 +882,8 @@ func (s *server) Read(ctx context.Context, req *resourcepb.ReadRequest) (*resour res *resourcepb.ReadResponse err error ) - runErr := s.runInQueue(ctx, req.Key.Namespace, func() { - res, err = s.read(ctx, user, req) + runErr := s.runInQueue(ctx, req.Key.Namespace, func(queueCtx context.Context) { + res, err = s.read(queueCtx, user, req) }) if runErr != nil { return HandleQueueError(runErr, func(e *resourcepb.ErrorResult) *resourcepb.ReadResponse { @@ -1375,40 +1389,44 @@ func (s *server) GetBlob(ctx context.Context, req *resourcepb.GetBlobRequest) (* return rsp, nil } -func (s *server) runInQueue(ctx context.Context, tenantID string, runnable func()) error { - boff := backoff.New(ctx, backoff.Config{ - MinBackoff: DefaultMinBackoff, - MaxBackoff: DefaultMaxBackoff, - MaxRetries: DefaultMaxRetries, +func (s *server) runInQueue(ctx context.Context, tenantID string, runnable func(ctx context.Context)) error { + // Enforce a timeout for the entire operation, including queueing and execution. + queueCtx, cancel := context.WithTimeout(ctx, s.queueConfig.Timeout) + defer cancel() + + done := make(chan struct{}) + wrappedRunnable := func() { + defer close(done) + runnable(queueCtx) + } + + // Retry enqueueing with backoff, respecting the timeout context. + boff := backoff.New(queueCtx, backoff.Config{ + MinBackoff: s.queueConfig.MinBackoff, + MaxBackoff: s.queueConfig.MaxBackoff, + MaxRetries: s.queueConfig.MaxRetries, }) - var ( - wg sync.WaitGroup - err error - ) - wg.Add(1) - wrapped := func() { - defer wg.Done() - runnable() - } - for boff.Ongoing() { - err = s.queue.Enqueue(ctx, tenantID, wrapped) + for { + err := s.queue.Enqueue(queueCtx, tenantID, wrappedRunnable) if err == nil { + // Successfully enqueued. break } - s.log.Warn("failed to enqueue runnable, retrying", - "maxRetries", DefaultMaxRetries, - "tenantID", tenantID, - "error", err) + + s.log.Warn("failed to enqueue runnable, retrying", "tenantID", tenantID, "error", err) + if !boff.Ongoing() { + // Backoff finished (retries exhausted or context canceled). + return fmt.Errorf("failed to enqueue for tenant %s: %w", tenantID, err) + } boff.Wait() } - if err != nil { - s.log.Error("failed to enqueue runnable", - "maxRetries", DefaultMaxRetries, - "tenantID", tenantID, - "error", err) - return fmt.Errorf("failed to enqueue runnable for tenant %s: %w", tenantID, err) + + // Wait for the runnable to complete or for the context to be done. + select { + case <-done: + return nil // Completed successfully. + case <-queueCtx.Done(): + return queueCtx.Err() // Timed out or canceled while waiting for execution. } - wg.Wait() - return nil } diff --git a/pkg/storage/unified/resource/server_test.go b/pkg/storage/unified/resource/server_test.go index b0dff754a18..98bc928da63 100644 --- a/pkg/storage/unified/resource/server_test.go +++ b/pkg/storage/unified/resource/server_test.go @@ -4,20 +4,27 @@ import ( "context" "encoding/json" "fmt" + "log/slog" "net/http" "os" + "sync" "testing" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gocloud.dev/blob/fileblob" "gocloud.dev/blob/memblob" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" authlib "github.com/grafana/authlib/types" + "github.com/grafana/dskit/services" "github.com/grafana/grafana/pkg/apimachinery/identity" "github.com/grafana/grafana/pkg/apimachinery/utils" + "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/storage/unified/resourcepb" + "github.com/grafana/grafana/pkg/util/scheduler" ) func TestSimpleServer(t *testing.T) { @@ -242,3 +249,105 @@ func TestSimpleServer(t *testing.T) { require.ErrorIs(t, err, ErrOptimisticLockingFailed) }) } + +func TestRunInQueue(t *testing.T) { + const testTenantID = "test-tenant" + t.Run("should execute successfully when queue has capacity", func(t *testing.T) { + s, _ := newTestServerWithQueue(t, 1, 1) + executed := make(chan bool, 1) + + runnable := func(ctx context.Context) { + executed <- true + } + + err := s.runInQueue(context.Background(), testTenantID, runnable) + require.NoError(t, err) + assert.True(t, <-executed, "runnable should have been executed") + }) + + t.Run("should time out if a task is sitting in the queue beyond the timeout", func(t *testing.T) { + s, _ := newTestServerWithQueue(t, 1, 1) + executed := make(chan struct{}, 1) + runnable := func(ctx context.Context) { + time.Sleep(1 * time.Second) + executed <- struct{}{} + } + + err := s.runInQueue(context.Background(), testTenantID, runnable) + require.Error(t, err) + assert.Equal(t, context.DeadlineExceeded, err) + <-executed + }) + + t.Run("should return an error if queue is consistently full after retrying", func(t *testing.T) { + s, q := newTestServerWithQueue(t, 1, 1) + // Task 1: This will be picked up by the worker and block it. + blocker := make(chan struct{}) + defer close(blocker) + blockingRunnable := func() { + <-blocker + } + err := q.Enqueue(context.Background(), testTenantID, blockingRunnable) + require.NoError(t, err) + for q.Len() > 0 { + time.Sleep(100 * time.Millisecond) + } + err = q.Enqueue(context.Background(), testTenantID, blockingRunnable) + require.NoError(t, err) + + // Task 2: This runnable should never execute because the queue is full. + mu := sync.Mutex{} + executed := false + runnable := func(ctx context.Context) { + mu.Lock() + defer mu.Unlock() + executed = true + } + + err = s.runInQueue(context.Background(), testTenantID, runnable) + require.Error(t, err) + require.ErrorIs(t, err, scheduler.ErrTenantQueueFull) + require.False(t, executed, "runnable should not have been executed") + }) +} + +// newTestServerWithQueue creates a server with a real scheduler.Queue for testing. +// It also sets up a worker to consume items from the queue. +func newTestServerWithQueue(t *testing.T, maxSizePerTenant int, numWorkers int) (*server, *scheduler.Queue) { + t.Helper() + q := scheduler.NewQueue(&scheduler.QueueOptions{ + MaxSizePerTenant: maxSizePerTenant, + Registerer: prometheus.NewRegistry(), + Logger: log.NewNopLogger(), + }) + err := services.StartAndAwaitRunning(context.Background(), q) + require.NoError(t, err) + t.Cleanup(func() { + err := services.StopAndAwaitTerminated(context.Background(), q) + require.NoError(t, err) + }) + + // Create a worker to consume from the queue + worker, err := scheduler.NewScheduler(q, &scheduler.Config{ + Logger: log.NewNopLogger(), + NumWorkers: numWorkers, + }) + require.NoError(t, err) + err = services.StartAndAwaitRunning(context.Background(), worker) + require.NoError(t, err) + t.Cleanup(func() { + err := services.StopAndAwaitTerminated(context.Background(), worker) + require.NoError(t, err) + }) + + s := &server{ + queue: q, + queueConfig: QueueConfig{ + Timeout: 500 * time.Millisecond, + MaxRetries: 2, + MinBackoff: 10 * time.Millisecond, + }, + log: slog.Default(), + } + return s, q +}