diff --git a/pkg/cmd/grafana-server/main.go b/pkg/cmd/grafana-server/main.go index 0193d8e95ea..cf0fdf58015 100644 --- a/pkg/cmd/grafana-server/main.go +++ b/pkg/cmd/grafana-server/main.go @@ -194,6 +194,7 @@ func listenToSystemSignals(s *server.Server) { } case sig := <-signalChan: s.Shutdown(fmt.Sprintf("System signal: %s", sig)) + return } } } diff --git a/pkg/server/server.go b/pkg/server/server.go index 29e8a216b8c..fcd5132e6ce 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -57,16 +57,40 @@ type Config struct { Listener net.Listener } +type serviceRegistry interface { + IsDisabled(srv registry.Service) bool + GetServices() []*registry.Descriptor +} + +type globalServiceRegistry struct{} + +func (r *globalServiceRegistry) IsDisabled(srv registry.Service) bool { + return registry.IsDisabled(srv) +} + +func (r *globalServiceRegistry) GetServices() []*registry.Descriptor { + return registry.GetServices() +} + // New returns a new instance of Server. func New(cfg Config) (*Server, error) { + s := newServer(cfg) + if err := s.init(); err != nil { + return nil, err + } + return s, nil +} + +func newServer(cfg Config) *Server { rootCtx, shutdownFn := context.WithCancel(context.Background()) childRoutines, childCtx := errgroup.WithContext(rootCtx) - s := &Server{ - context: childCtx, - shutdownFn: shutdownFn, - childRoutines: childRoutines, - log: log.New("server"), + return &Server{ + context: childCtx, + shutdownFn: shutdownFn, + shutdownFinished: make(chan struct{}), + childRoutines: childRoutines, + log: log.New("server"), // Need to use the singleton setting.Cfg instance, to make sure we use the same as is injected in the DI // graph cfg: setting.GetCfg(), @@ -77,28 +101,24 @@ func New(cfg Config) (*Server, error) { version: cfg.Version, commit: cfg.Commit, buildBranch: cfg.BuildBranch, - listener: cfg.Listener, - } - if err := s.init(); err != nil { - return nil, err + serviceRegistry: &globalServiceRegistry{}, + listener: cfg.Listener, } - - return s, nil } // Server is responsible for managing the lifecycle of services. type Server struct { - context context.Context - shutdownFn context.CancelFunc - childRoutines *errgroup.Group - log log.Logger - cfg *setting.Cfg - shutdownReason string - shutdownInProgress bool - isInitialized bool - mtx sync.Mutex - listener net.Listener + context context.Context + shutdownFn context.CancelFunc + childRoutines *errgroup.Group + log log.Logger + cfg *setting.Cfg + shutdownOnce sync.Once + shutdownFinished chan struct{} + isInitialized bool + mtx sync.Mutex + listener net.Listener configFile string homePath string @@ -107,6 +127,8 @@ type Server struct { commit string buildBranch string + serviceRegistry serviceRegistry + HTTPServer *api.HTTPServer `inject:""` } @@ -129,7 +151,7 @@ func (s *Server) init() error { login.Init() social.NewOAuthService() - services := registry.GetServices() + services := s.serviceRegistry.GetServices() if err := s.buildServiceGraph(services); err != nil { return err } @@ -150,12 +172,14 @@ func (s *Server) init() error { // Run initializes and starts services. This will block until all services have // exited. To initiate shutdown, call the Shutdown method in another goroutine. -func (s *Server) Run() (err error) { - if err = s.init(); err != nil { - return +func (s *Server) Run() error { + defer close(s.shutdownFinished) + + if err := s.init(); err != nil { + return err } - services := registry.GetServices() + services := s.serviceRegistry.GetServices() // Start background services. for _, svc := range services { @@ -164,78 +188,60 @@ func (s *Server) Run() (err error) { continue } - if registry.IsDisabled(svc.Instance) { + if s.serviceRegistry.IsDisabled(svc.Instance) { continue } // Variable is needed for accessing loop variable in callback descriptor := svc s.childRoutines.Go(func() error { - // Don't start new services when server is shutting down. - if s.shutdownInProgress { - return nil + select { + case <-s.context.Done(): + return s.context.Err() + default: } - err := service.Run(s.context) - if err != nil { - // Mark that we are in shutdown mode - // So no more services are started - s.shutdownInProgress = true - if !errors.Is(err, context.Canceled) { - // Server has crashed. - s.log.Error("Stopped "+descriptor.Name, "reason", err) - } else { - s.log.Debug("Stopped "+descriptor.Name, "reason", err) - } - - return err + // Do not return context.Canceled error since errgroup.Group only + // returns the first error to the caller - thus we can miss a more + // interesting error. + if err != nil && !errors.Is(err, context.Canceled) { + s.log.Error("Stopped "+descriptor.Name, "reason", err) + return fmt.Errorf("%s run error: %w", descriptor.Name, err) } - + s.log.Debug("Stopped "+descriptor.Name, "reason", err) return nil }) } - defer func() { - s.log.Debug("Waiting on services...") - if waitErr := s.childRoutines.Wait(); waitErr != nil && !errors.Is(waitErr, context.Canceled) { - s.log.Error("A service failed", "err", waitErr) - if err == nil { - err = waitErr - } - } - }() - s.notifySystemd("READY=1") - return nil + s.log.Debug("Waiting on services...") + return s.childRoutines.Wait() } +// Shutdown initiates Grafana graceful shutdown. This shuts down all +// running background services. Since Run blocks Shutdown supposed to +// be run from a separate goroutine. func (s *Server) Shutdown(reason string) { - s.log.Info("Shutdown started", "reason", reason) - s.shutdownReason = reason - s.shutdownInProgress = true - - // call cancel func on root context - s.shutdownFn() - - // wait for child routines - if err := s.childRoutines.Wait(); err != nil && !errors.Is(err, context.Canceled) { - s.log.Error("Failed waiting for services to shutdown", "err", err) - } + s.shutdownOnce.Do(func() { + s.log.Info("Shutdown started", "reason", reason) + // Call cancel func to stop services. + s.shutdownFn() + // Can introduce termination timeout here if needed over incoming Context, + // but this will require changing Shutdown method signature to accept context + // and return an error - caller can exit with code > 0 then. + // I.e. sth like server.Shutdown(context.WithTimeout(...), reason). + <-s.shutdownFinished + }) } // ExitCode returns an exit code for a given error. -func (s *Server) ExitCode(reason error) int { - code := 1 - - if errors.Is(reason, context.Canceled) && s.shutdownReason != "" { - reason = fmt.Errorf(s.shutdownReason) - code = 0 +func (s *Server) ExitCode(runError error) int { + if runError != nil { + s.log.Error("Server shutdown", "error", runError) + return 1 } - - s.log.Error("Server shutdown", "reason", reason) - - return code + return 0 } // writePIDFile retrieves the current process ID and writes it to file. @@ -283,7 +289,7 @@ func (s *Server) loadConfiguration() { } if err := s.cfg.Load(args); err != nil { - fmt.Fprintf(os.Stderr, "Failed to start grafana. error: %s\n", err.Error()) + _, _ = fmt.Fprintf(os.Stderr, "Failed to start grafana. error: %s\n", err.Error()) os.Exit(1) } diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go new file mode 100644 index 00000000000..0e112165f22 --- /dev/null +++ b/pkg/server/server_test.go @@ -0,0 +1,113 @@ +package server + +import ( + "context" + "errors" + "testing" + + "github.com/grafana/grafana/pkg/registry" + + "github.com/stretchr/testify/require" +) + +type testServiceRegistry struct { + services []*registry.Descriptor +} + +func (r *testServiceRegistry) GetServices() []*registry.Descriptor { + return r.services +} + +func (r *testServiceRegistry) IsDisabled(_ registry.Service) bool { + return false +} + +type testService struct { + started chan struct{} + initErr error + runErr error +} + +func newTestService(initErr, runErr error) *testService { + return &testService{ + started: make(chan struct{}), + initErr: initErr, + runErr: runErr, + } +} + +func (s *testService) Init() error { + return s.initErr +} + +func (s *testService) Run(ctx context.Context) error { + if s.runErr != nil { + return s.runErr + } + close(s.started) + <-ctx.Done() + return ctx.Err() +} + +func testServer() *Server { + s := newServer(Config{}) + // Required to skip configuration initialization that causes + // DI errors in this test. + s.isInitialized = true + return s +} + +func TestServer_Run_Error(t *testing.T) { + s := testServer() + + var testErr = errors.New("boom") + + s.serviceRegistry = &testServiceRegistry{ + services: []*registry.Descriptor{ + { + Name: "TestService1", + Instance: newTestService(nil, nil), + InitPriority: registry.High, + }, + { + Name: "TestService2", + Instance: newTestService(nil, testErr), + InitPriority: registry.High, + }, + }, + } + + err := s.Run() + require.ErrorIs(t, err, testErr) + require.NotZero(t, s.ExitCode(err)) +} + +func TestServer_Shutdown(t *testing.T) { + s := testServer() + services := []*registry.Descriptor{ + { + Name: "TestService1", + Instance: newTestService(nil, nil), + InitPriority: registry.High, + }, + { + Name: "TestService2", + Instance: newTestService(nil, nil), + InitPriority: registry.High, + }, + } + s.serviceRegistry = &testServiceRegistry{ + services: services, + } + + go func() { + // Wait until all services launched. + for _, svc := range services { + <-svc.Instance.(*testService).started + } + s.Shutdown("test interrupt") + }() + err := s.Run() + require.NoError(t, err) + require.Zero(t, s.ExitCode(err)) +} diff --git a/pkg/tests/testinfra/testinfra.go b/pkg/tests/testinfra/testinfra.go index 1067276bbf0..5e419cf0bb5 100644 --- a/pkg/tests/testinfra/testinfra.go +++ b/pkg/tests/testinfra/testinfra.go @@ -56,7 +56,7 @@ func StartGrafana(t *testing.T, grafDir, cfgPath string, sqlStore *sqlstore.SQLS } }() t.Cleanup(func() { - server.Shutdown("") + server.Shutdown("test cleanup") }) // Wait for Grafana to be ready