Background Services: Remove dskitBackgroundServices toggle (#111255)

This commit is contained in:
Todd Treece 2025-09-26 15:16:06 -04:00 committed by GitHub
parent 052b6e3dfd
commit a333e8a8da
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 96 additions and 215 deletions

View File

@ -1169,11 +1169,6 @@ export interface FeatureToggles {
*/
prometheusTypeMigration?: boolean;
/**
* Enables dskit background service wrapper
* @default false
*/
dskitBackgroundServices?: boolean;
/**
* Enables running plugins in containers
* @default false
*/

View File

@ -23,6 +23,7 @@ type Registry interface {
}
type Manager interface {
services.NamedService
Registry
Engine
}

View File

@ -5,6 +5,7 @@ import (
"time"
"github.com/grafana/dskit/services"
"go.opentelemetry.io/otel/trace"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
@ -16,7 +17,7 @@ var (
stopTimeout = 30 * time.Second
)
type managerAdapter struct {
type ManagerAdapter struct {
services.NamedService
reg registry.BackgroundServiceRegistry
@ -32,8 +33,8 @@ type managerAdapter struct {
// - Graceful shutdown with proper cleanup ordering
//
// Services implementing CanBeDisabled that are disabled will be skipped.
func NewManagerAdapter(reg registry.BackgroundServiceRegistry) *managerAdapter {
m := &managerAdapter{
func NewManagerAdapter(reg registry.BackgroundServiceRegistry) *ManagerAdapter {
m := &ManagerAdapter{
reg: reg,
dependencyMap: dependencyMap(),
}
@ -41,7 +42,12 @@ func NewManagerAdapter(reg registry.BackgroundServiceRegistry) *managerAdapter {
return m
}
func (m *managerAdapter) starting(ctx context.Context) error {
func (m *ManagerAdapter) WithDependencies(dependencyMap map[string][]string) *ManagerAdapter {
m.dependencyMap = dependencyMap
return m
}
func (m *ManagerAdapter) starting(ctx context.Context) error {
spanCtx, span := tracing.Start(ctx, "backgroundsvcs.managerAdapter.starting")
defer span.End()
logger := log.New("backgroundsvcs.managerAdapter").FromContext(spanCtx)
@ -76,16 +82,20 @@ func (m *managerAdapter) starting(ctx context.Context) error {
manager.RegisterModule(BackgroundServices, nil)
m.manager = manager
return nil
if err := m.manager.StartAsync(ctx); err != nil {
return err
}
return m.manager.AwaitRunning(ctx)
}
func (m *managerAdapter) running(ctx context.Context) error {
spanCtx, span := tracing.Start(ctx, "backgroundsvcs.managerAdapter.running")
func (m *ManagerAdapter) running(ctx context.Context) error {
newCtx := trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx))
spanCtx, span := tracing.Start(newCtx, "backgroundsvcs.managerAdapter.running")
defer span.End()
return m.manager.Run(spanCtx)
return m.manager.AwaitTerminated(spanCtx)
}
func (m *managerAdapter) stopping(failure error) error {
func (m *ManagerAdapter) stopping(failure error) error {
ctx, cancel := context.WithTimeout(context.Background(), stopTimeout)
defer cancel()
spanCtx, span := tracing.Start(ctx, "backgroundsvcs.managerAdapter.stopping")
@ -98,15 +108,16 @@ func (m *managerAdapter) stopping(failure error) error {
}
// Run initializes and starts all background services using dskit's module and service patterns.
func (m *managerAdapter) Run(ctx context.Context) error {
func (m *ManagerAdapter) Run(ctx context.Context) error {
if err := m.StartAsync(ctx); err != nil {
return err
}
return m.AwaitTerminated(ctx)
stopCtx := trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx))
return m.AwaitTerminated(stopCtx)
}
// Shutdown calls calls the underlying manager's Shutdown
func (m *managerAdapter) Shutdown(ctx context.Context, reason string) error {
func (m *ManagerAdapter) Shutdown(ctx context.Context, reason string) error {
m.StopAsync()
return m.AwaitTerminated(ctx)
}

View File

@ -28,10 +28,10 @@ func TestNewManagerAdapter(t *testing.T) {
func TestManagerAdapter_Starting(t *testing.T) {
t.Run("empty registry initializes manager", func(t *testing.T) {
reg := &mockBackgroundServiceRegistry{services: []registry.BackgroundService{}}
adapter := NewManagerAdapter(reg)
adapter.dependencyMap = map[string][]string{
BackgroundServices: {},
}
adapter := NewManagerAdapter(reg).WithDependencies(map[string][]string{
BackgroundServices: {Core},
Core: {},
})
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
@ -55,10 +55,10 @@ func TestManagerAdapter_Starting(t *testing.T) {
reg := &mockBackgroundServiceRegistry{
services: []registry.BackgroundService{enabledSvc, disabledSvc, namedSvc},
}
adapter := NewManagerAdapter(reg)
adapter.dependencyMap = map[string][]string{
BackgroundServices: {},
}
adapter := NewManagerAdapter(reg).WithDependencies(map[string][]string{
BackgroundServices: {Core},
Core: {},
})
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
@ -110,8 +110,11 @@ func TestManagerAdapter_Starting(t *testing.T) {
// Pre-populate the dependency map with the service using the actual service name that will be used
serviceName := "*adapter.mockNamedService"
adapter.dependencyMap[serviceName] = []string{"custom-dependency"}
initialBgDeps := append([]string{}, adapter.dependencyMap[BackgroundServices]...)
adapter.WithDependencies(map[string][]string{
serviceName: {BackgroundServices},
Core: {},
BackgroundServices: {Core},
})
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
@ -121,18 +124,20 @@ func TestManagerAdapter_Starting(t *testing.T) {
require.NotNil(t, adapter.manager)
// Verify the existing dependency was not overwritten
require.Equal(t, []string{"custom-dependency"}, adapter.dependencyMap[serviceName])
require.Equal(t, []string{BackgroundServices}, adapter.dependencyMap[serviceName])
// Verify BackgroundServices dependencies were not modified (should not contain the service twice)
finalBgDeps := adapter.dependencyMap[BackgroundServices]
require.Equal(t, initialBgDeps, finalBgDeps)
require.Equal(t, []string{Core}, finalBgDeps)
})
t.Run("service without NamedService interface gets wrapped", func(t *testing.T) {
// Create a service that doesn't implement NamedService
plainSvc := &mockService{}
reg := &mockBackgroundServiceRegistry{services: []registry.BackgroundService{plainSvc}}
adapter := NewManagerAdapter(reg)
adapter := NewManagerAdapter(reg).WithDependencies(map[string][]string{
BackgroundServices: {Core},
Core: {},
})
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
@ -151,11 +156,13 @@ func TestManagerAdapter_Starting(t *testing.T) {
})
t.Run("service without CanBeDisabled interface is always enabled", func(t *testing.T) {
// Create a service that doesn't implement CanBeDisabled
simpleSvc := &simpleBackgroundService{}
reg := &mockBackgroundServiceRegistry{services: []registry.BackgroundService{simpleSvc}}
adapter := NewManagerAdapter(reg)
adapter := NewManagerAdapter(reg).WithDependencies(map[string][]string{
BackgroundServices: {Core},
Core: {},
})
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
@ -168,36 +175,18 @@ func TestManagerAdapter_Starting(t *testing.T) {
expectedServiceName := reflect.TypeOf(simpleSvc).String()
require.Contains(t, adapter.dependencyMap, expectedServiceName)
})
t.Run("real manager integration test", func(t *testing.T) {
testSvc := &mockService{}
reg := &mockBackgroundServiceRegistry{services: []registry.BackgroundService{testSvc}}
adapter := NewManagerAdapter(reg)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
// Use the real manager - this tests actual integration
err := adapter.starting(ctx)
require.NoError(t, err)
require.NotNil(t, adapter.manager)
// Verify the service was registered in dependency map
expectedServiceName := reflect.TypeOf(testSvc).String()
require.Contains(t, adapter.dependencyMap, expectedServiceName)
})
}
func TestManagerAdapter_Running(t *testing.T) {
t.Run("runs with real manager", func(t *testing.T) {
t.Run("runs with manager", func(t *testing.T) {
mock := &mockNamedService{name: "mock"}
reg := &mockBackgroundServiceRegistry{services: []registry.BackgroundService{
mock,
}}
adapter := NewManagerAdapter(reg)
adapter.dependencyMap = map[string][]string{
BackgroundServices: {},
}
adapter := NewManagerAdapter(reg).WithDependencies(map[string][]string{
BackgroundServices: {Core},
Core: {},
})
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
@ -208,25 +197,6 @@ func TestManagerAdapter_Running(t *testing.T) {
err = adapter.AwaitRunning(ctx)
require.NoError(t, err)
})
t.Run("running delegates to manager", func(t *testing.T) {
reg := &mockBackgroundServiceRegistry{services: []registry.BackgroundService{}}
adapter := NewManagerAdapter(reg)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
// Initialize with real manager
err := adapter.starting(ctx)
require.NoError(t, err)
// Test running method directly - this will likely fail due to missing production modules
// but it covers the running method code path
err = adapter.running(ctx)
if err != nil {
require.Contains(t, err.Error(), "no such module")
}
})
}
func TestManagerAdapter_Stopping(t *testing.T) {
@ -262,7 +232,6 @@ func TestManagerAdapter_Stopping(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
// Initialize the manager first - need to go through starting to initialize manager
err := adapter.starting(ctx)
require.NoError(t, err)
require.NotNil(t, adapter.manager)

View File

@ -2,19 +2,14 @@ package server
import (
"context"
"errors"
"fmt"
"net"
"os"
"path/filepath"
"reflect"
"strconv"
"sync"
"github.com/grafana/dskit/modules"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
"github.com/grafana/grafana/pkg/api"
_ "github.com/grafana/grafana/pkg/extensions"
@ -24,9 +19,7 @@ import (
"github.com/grafana/grafana/pkg/infra/usagestats/statscollector"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/registry/backgroundsvcs/adapter"
"github.com/grafana/grafana/pkg/semconv"
"github.com/grafana/grafana/pkg/services/accesscontrol"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/provisioning"
"github.com/grafana/grafana/pkg/setting"
)
@ -66,18 +59,14 @@ func newServer(opts Options, cfg *setting.Cfg, httpServer *api.HTTPServer, roleR
tracerProvider *tracing.TracingService,
promReg prometheus.Registerer,
) (*Server, error) {
rootCtx, shutdownFn := context.WithCancel(context.Background())
childRoutines, childCtx := errgroup.WithContext(rootCtx)
rootCtx := context.Background()
s := &Server{
promReg: promReg,
context: childCtx,
childRoutines: childRoutines,
context: rootCtx,
HTTPServer: httpServer,
provisioningService: provisioningService,
roleRegistry: roleRegistry,
shutdownFn: shutdownFn,
shutdownFinished: make(chan struct{}),
log: log.New("server"),
cfg: cfg,
pidFile: opts.PidFile,
@ -86,6 +75,7 @@ func newServer(opts Options, cfg *setting.Cfg, httpServer *api.HTTPServer, roleR
buildBranch: opts.BuildBranch,
backgroundServiceRegistry: backgroundServiceProvider,
tracerProvider: tracerProvider,
managerAdapter: adapter.NewManagerAdapter(backgroundServiceProvider),
}
return s, nil
@ -95,15 +85,12 @@ func newServer(opts Options, cfg *setting.Cfg, httpServer *api.HTTPServer, roleR
// core Server implementation which starts the entire Grafana server. Use
// ModuleServer to launch specific modules.
type Server struct {
context context.Context
shutdownFn func()
childRoutines *errgroup.Group
log log.Logger
cfg *setting.Cfg
shutdownOnce sync.Once
shutdownFinished chan struct{}
isInitialized bool
mtx sync.Mutex
context context.Context
log log.Logger
cfg *setting.Cfg
shutdownOnce sync.Once
isInitialized bool
mtx sync.Mutex
pidFile string
version string
@ -117,6 +104,7 @@ type Server struct {
roleRegistry accesscontrol.RoleRegistry
provisioningService provisioning.ProvisioningService
promReg prometheus.Registerer
managerAdapter *adapter.ManagerAdapter
}
// Init initializes the server and its services.
@ -145,83 +133,14 @@ func (s *Server) Init() error {
}
func (s *Server) Run() error {
if s.cfg.IsFeatureToggleEnabled(featuremgmt.FlagDskitBackgroundServices) {
s.log.Debug("Running background services with dskit wrapper")
return s.dskitRun()
}
s.log.Debug("Running standard background services")
return s.backgroundServicesRun()
}
func (s *Server) dskitRun() error {
if err := s.Init(); err != nil {
return err
}
managerAdapter := adapter.NewManagerAdapter(s.backgroundServiceRegistry)
s.notifySystemd("READY=1")
ctx, span := s.tracerProvider.Start(s.context, "server.dskitRun")
defer span.End()
// override the shutdownFn (context cancel func) for now until the feature flag is removed.
// this is a temporary solution to ensure that the services are shutdown properly.
cancelFn := s.shutdownFn
s.shutdownFn = func() {
defer close(s.shutdownFinished)
s.log.Debug("Shutting down background services")
if err := managerAdapter.Shutdown(s.context, modules.ErrStopProcess.Error()); err != nil {
s.log.Error("Failed to shutdown background services", "error", err)
}
cancelFn()
}
return managerAdapter.Run(ctx)
}
func (s *Server) backgroundServicesRun() error {
ctx, span := s.tracerProvider.Start(s.context, "server.backgroundServicesRun")
defer span.End()
defer close(s.shutdownFinished)
if err := s.Init(); err != nil {
return err
}
services := s.backgroundServiceRegistry.GetServices()
// Start background services.
for _, svc := range services {
if registry.IsDisabled(svc) {
continue
}
service := svc
serviceName := reflect.TypeOf(service).String()
s.childRoutines.Go(func() error {
select {
case <-s.context.Done():
return s.context.Err()
default:
}
s.log.Debug("Starting background service", "service", serviceName)
span.AddEvent(fmt.Sprintf("%s start", serviceName), trace.WithAttributes(semconv.GrafanaServiceName(serviceName)))
err := service.Run(ctx)
// Do not return context.Canceled error since errgroup.Group only
// returns the first error to the caller - thus we can miss a more
// interesting error.
if err != nil && !errors.Is(err, context.Canceled) {
s.log.Error("Stopped background service", "service", serviceName, "reason", err)
return fmt.Errorf("%s run error: %w", serviceName, err)
}
s.log.Debug("Stopped background service", "service", serviceName, "reason", err)
return nil
})
}
ctx, span := s.tracerProvider.Start(s.context, "server.Run")
defer span.End()
s.notifySystemd("READY=1")
s.log.Debug("Waiting on services...")
return s.childRoutines.Wait()
return s.managerAdapter.Run(ctx)
}
// Shutdown initiates Grafana graceful shutdown. This shuts down all
@ -231,15 +150,15 @@ func (s *Server) Shutdown(ctx context.Context, reason string) error {
var err error
s.shutdownOnce.Do(func() {
s.log.Info("Shutdown started", "reason", reason)
// Call cancel func to stop background services.
s.shutdownFn()
// Wait for server to shut down
if shutdownErr := s.managerAdapter.Shutdown(ctx, "shutdown"); shutdownErr != nil {
s.log.Error("Failed to shutdown background services", "error", shutdownErr)
}
select {
case <-s.shutdownFinished:
s.log.Debug("Finished waiting for server to shut down")
case <-ctx.Done():
s.log.Warn("Timed out while waiting for server to shut down")
err = fmt.Errorf("timeout waiting for shutdown")
default:
s.log.Debug("Finished waiting for server to shut down")
}
})

View File

@ -13,6 +13,7 @@ import (
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/registry/backgroundsvcs"
"github.com/grafana/grafana/pkg/registry/backgroundsvcs/adapter"
"github.com/grafana/grafana/pkg/services/accesscontrol/acimpl"
"github.com/grafana/grafana/pkg/setting"
)
@ -52,6 +53,10 @@ func testServer(t *testing.T, services ...registry.BackgroundService) *Server {
t.Helper()
s, err := newServer(Options{}, setting.NewCfg(), nil, &acimpl.Service{}, nil, backgroundsvcs.NewBackgroundServiceRegistry(services...), tracing.NewNoopTracerService(), prometheus.NewRegistry())
require.NoError(t, err)
s.managerAdapter.WithDependencies(map[string][]string{
adapter.Core: {},
adapter.BackgroundServices: {adapter.Core},
})
// Required to skip configuration initialization that causes
// DI errors in this test.
s.isInitialized = true
@ -62,33 +67,28 @@ func TestServer_Run_Error(t *testing.T) {
testErr := errors.New("boom")
s := testServer(t, newTestService(nil, false), newTestService(testErr, false))
err := s.Run()
require.ErrorIs(t, err, testErr)
require.Error(t, err)
require.Contains(t, err.Error(), testErr.Error())
}
func TestServer_Shutdown(t *testing.T) {
ctx := context.Background()
t.Run("successful shutdown", func(t *testing.T) {
ctx := context.Background()
s := testServer(t, newTestService(nil, false), newTestService(nil, true))
ch := make(chan error)
go func() {
defer close(ch)
err := s.managerAdapter.AwaitRunning(ctx)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
err = s.Shutdown(ctx, "test interrupt")
ch <- err
}()
err := s.Run()
require.NoError(t, err)
s := testServer(t, newTestService(nil, false), newTestService(nil, true))
ch := make(chan error)
go func() {
defer close(ch)
// Wait until all services launched.
for _, svc := range s.backgroundServiceRegistry.GetServices() {
if !svc.(*testService).isDisabled {
<-svc.(*testService).started
}
}
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
err := s.Shutdown(ctx, "test interrupt")
ch <- err
}()
err := s.Run()
require.NoError(t, err)
err = <-ch
require.NoError(t, err)
err = <-ch
require.NoError(t, err)
})
}

View File

@ -2027,16 +2027,6 @@ var (
Owner: grafanaPartnerPluginsSquad,
Expression: "false",
},
{
Name: "dskitBackgroundServices",
Description: "Enables dskit background service wrapper",
HideFromAdminPage: true,
HideFromDocs: true,
Stage: FeatureStageExperimental,
RequiresRestart: true,
Owner: grafanaPluginsPlatformSquad,
Expression: "false",
},
{
Name: "pluginContainers",
Description: "Enables running plugins in containers",

View File

@ -260,7 +260,6 @@ alertingTriage,experimental,@grafana/alerting-squad,false,false,true
graphiteBackendMode,privatePreview,@grafana/partner-datasources,false,false,false
azureResourcePickerUpdates,preview,@grafana/partner-datasources,false,false,true
prometheusTypeMigration,experimental,@grafana/partner-datasources,false,true,false
dskitBackgroundServices,experimental,@grafana/plugins-platform-backend,false,true,false
pluginContainers,privatePreview,@grafana/plugins-platform-backend,false,true,false
tempoSearchBackendMigration,GA,@grafana/oss-big-tent,false,true,false
filterOutBotsFromFrontendLogs,experimental,@grafana/plugins-platform-backend,false,false,true

1 Name Stage Owner requiresDevMode RequiresRestart FrontendOnly
260 graphiteBackendMode privatePreview @grafana/partner-datasources false false false
261 azureResourcePickerUpdates preview @grafana/partner-datasources false false true
262 prometheusTypeMigration experimental @grafana/partner-datasources false true false
dskitBackgroundServices experimental @grafana/plugins-platform-backend false true false
263 pluginContainers privatePreview @grafana/plugins-platform-backend false true false
264 tempoSearchBackendMigration GA @grafana/oss-big-tent false true false
265 filterOutBotsFromFrontendLogs experimental @grafana/plugins-platform-backend false false true

View File

@ -1051,10 +1051,6 @@ const (
// Checks for deprecated Prometheus authentication methods (SigV4 and Azure), installs the relevant data source, and migrates the Prometheus data sources
FlagPrometheusTypeMigration = "prometheusTypeMigration"
// FlagDskitBackgroundServices
// Enables dskit background service wrapper
FlagDskitBackgroundServices = "dskitBackgroundServices"
// FlagPluginContainers
// Enables running plugins in containers
FlagPluginContainers = "pluginContainers"

View File

@ -1245,6 +1245,7 @@
"name": "dskitBackgroundServices",
"resourceVersion": "1757339637779",
"creationTimestamp": "2025-09-03T12:20:24Z",
"deletionTimestamp": "2025-09-17T12:19:32Z",
"annotations": {
"grafana.app/updatedTimestamp": "2025-09-08 13:53:57.77994 +0000 UTC"
}