mirror of https://github.com/grafana/grafana.git
				
				
				
			fix(unified-storage): fix graceful termination for grafana target servers (#103520)
This commit is contained in:
		
							parent
							
								
									5c53d33c8e
								
							
						
					
					
						commit
						56b4e5670d
					
				|  | @ -1117,8 +1117,8 @@ github.com/grafana/grafana/pkg/storage/unified/apistore v0.0.0-20250121113133-e7 | |||
| github.com/grafana/grafana/pkg/storage/unified/apistore v0.0.0-20250317130411-3f270d1de043/go.mod h1:usON2sfgh4qjGs4GLhH6+PL7Q6g5ezOP6M/9vOeHpAM= | ||||
| github.com/grafana/prometheus-alertmanager v0.25.1-0.20240930132144-b5e64e81e8d3 h1:6D2gGAwyQBElSrp3E+9lSr7k8gLuP3Aiy20rweLWeBw= | ||||
| github.com/grafana/prometheus-alertmanager v0.25.1-0.20240930132144-b5e64e81e8d3/go.mod h1:YeND+6FDA7OuFgDzYODN8kfPhXLCehcpxe4T9mdnpCY= | ||||
| github.com/grafana/prometheus-alertmanager v0.25.1-0.20250403152731-b0a305952f0c h1:KH22wMjV61ysp8igdHpuP3zJYsaaIL/GJj4y67Vr/7M= | ||||
| github.com/grafana/prometheus-alertmanager v0.25.1-0.20250403152731-b0a305952f0c/go.mod h1:FGdGvhI40Dq+CTQaSzK9evuve774cgOUdGfVO04OXkw= | ||||
| github.com/grafana/prometheus-alertmanager v0.25.1-0.20250331083058-4563aec7a975 h1:4/BZkGObFWZf4cLbE2Vqg/1VTz67Q0AJ7LHspWLKJoQ= | ||||
| github.com/grafana/prometheus-alertmanager v0.25.1-0.20250331083058-4563aec7a975/go.mod h1:FGdGvhI40Dq+CTQaSzK9evuve774cgOUdGfVO04OXkw= | ||||
| github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0 h1:bjh0PVYSVVFxzINqPFYJmAmJNrWPgnVjuSdYJGHmtFU= | ||||
| github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0/go.mod h1:7t5XR+2IA8P2qggOAHTj/GCZfoLBle3OvNSYh1VkRBU= | ||||
| github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 h1:+ngKgrYPPJrOjhax5N+uePQ0Fh1Z7PheYoUI/0nzkPA= | ||||
|  |  | |||
|  | @ -100,8 +100,9 @@ func (m *service) Run(ctx context.Context) error { | |||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	err = m.serviceManager.AwaitStopped(ctx) | ||||
| 	if err != nil { | ||||
| 	stopCtx := context.Background() | ||||
| 	if err = m.serviceManager.AwaitStopped(stopCtx); err != nil { | ||||
| 		m.log.Error("Failed to stop module service manager", "error", err) | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
|  |  | |||
|  | @ -2,18 +2,15 @@ package server | |||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"net/http" | ||||
| 	"os" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"cuelang.org/go/pkg/regexp" | ||||
| 	"github.com/grafana/grafana/pkg/api" | ||||
| 	"github.com/grafana/grafana/pkg/infra/db" | ||||
| 	"github.com/grafana/grafana/pkg/modules" | ||||
| 	"github.com/grafana/grafana/pkg/setting" | ||||
| 	"github.com/grafana/grafana/pkg/tests/testsuite" | ||||
| 	"github.com/stretchr/testify/assert" | ||||
| 	"github.com/stretchr/testify/require" | ||||
| ) | ||||
| 
 | ||||
|  | @ -25,68 +22,47 @@ func TestIntegrationWillRunInstrumentationServerWhenTargetHasNoHttpServer(t *tes | |||
| 	if testing.Short() { | ||||
| 		t.Skip("skipping integration test") | ||||
| 	} | ||||
| 	dbType := os.Getenv("GRAFANA_TEST_DB") | ||||
| 	if dbType == "" { | ||||
| 		t.Skip("skipping - GRAFANA_TEST_DB not defined") | ||||
| 	} | ||||
| 	if dbType == "sqlite3" { | ||||
| 		t.Skip("skipping - sqlite not supported for storage server target") | ||||
| 	} | ||||
| 	// TODO - fix this test for postgres
 | ||||
| 	if dbType == "postgres" { | ||||
| 		t.Skip("skipping - test not working with postgres in Drone. Works locally.") | ||||
| 	if db.IsTestDbSQLite() { | ||||
| 		t.Skip("sqlite is not supported by the storage server target") | ||||
| 	} | ||||
| 
 | ||||
| 	_, cfg := db.InitTestDBWithCfg(t) | ||||
| 	cfg.HTTPPort = "3001" | ||||
| 	cfg.GRPCServer.Network = "tcp" | ||||
| 	cfg.GRPCServer.Address = "localhost:10000" | ||||
| 	addStorageServerToConfig(t, cfg, dbType) | ||||
| 	cfg.Target = []string{modules.StorageServer} | ||||
| 
 | ||||
| 	ms, err := InitializeModuleServer(cfg, Options{}, api.ServerOptions{}) | ||||
| 	require.NoError(t, err) | ||||
| 
 | ||||
| 	errChan := make(chan error, 1) | ||||
| 	go func() { | ||||
| 		err = ms.Run() | ||||
| 		if err.Error() != "context canceled" { | ||||
| 			t.Error(err) | ||||
| 		} | ||||
| 		errChan <- ms.Run() | ||||
| 	}() | ||||
| 	time.Sleep(500 * time.Millisecond) // wait for http server to be running
 | ||||
| 
 | ||||
| 	require.Eventually(t, func() bool { | ||||
| 		client := http.Client{} | ||||
| 		res, err := client.Get("http://localhost:3001/metrics") | ||||
| 	require.NoError(t, err) | ||||
| 	err = res.Body.Close() | ||||
| 	require.NoError(t, err) | ||||
| 	assert.Equal(t, 200, res.StatusCode) | ||||
| 		if err != nil { | ||||
| 			return false | ||||
| 		} | ||||
| 		defer func() { | ||||
| 			if err := res.Body.Close(); err != nil { | ||||
| 				t.Fatalf("failed to close response body: %v", err) | ||||
| 			} | ||||
| 		}() | ||||
| 		return res.StatusCode == http.StatusOK | ||||
| 	}, 10*time.Second, 1*time.Second) | ||||
| 
 | ||||
| 	err = ms.Shutdown(context.Background(), "test over") | ||||
| 	require.NoError(t, err) | ||||
| } | ||||
| 
 | ||||
| func addStorageServerToConfig(t *testing.T, cfg *setting.Cfg, dbType string) { | ||||
| 	s, err := cfg.Raw.NewSection("resource_api") | ||||
| 	require.NoError(t, err) | ||||
| 	_, err = s.NewKey("db_type", dbType) | ||||
| 	require.NoError(t, err) | ||||
| 
 | ||||
| 	if dbType == "postgres" { | ||||
| 		_, _ = s.NewKey("db_host", "localhost") | ||||
| 		_, _ = s.NewKey("db_name", "grafanatest") | ||||
| 		_, _ = s.NewKey("db_user", "grafanatest") | ||||
| 		_, _ = s.NewKey("db_pass", "grafanatest") | ||||
| 	} else { | ||||
| 		// cant use localhost as hostname in drone tests for mysql, so need to parse it from connection string
 | ||||
| 		sec, err := cfg.Raw.GetSection("database") | ||||
| 		require.NoError(t, err) | ||||
| 		connString := sec.Key("connection_string").String() | ||||
| 		matches, err := regexp.FindSubmatch("(.+):(.+)@tcp\\((.+):(\\d+)\\)/(.+)\\?", connString) | ||||
| 		require.NoError(t, err) | ||||
| 		_, _ = s.NewKey("db_host", matches[3]) | ||||
| 		_, _ = s.NewKey("db_name", matches[5]) | ||||
| 		_, _ = s.NewKey("db_user", matches[1]) | ||||
| 		_, _ = s.NewKey("db_pass", matches[2]) | ||||
| 	select { | ||||
| 	case err := <-errChan: | ||||
| 		if err != nil && !errors.Is(err, context.Canceled) { | ||||
| 			t.Fatalf("unexpected error from module server: %v", err) | ||||
| 		} | ||||
| 	case <-time.After(10 * time.Second): | ||||
| 		t.Fatal("timeout waiting for module server to shut down") | ||||
| 	} | ||||
| } | ||||
|  |  | |||
|  | @ -2,6 +2,7 @@ package sql | |||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"sync" | ||||
| 
 | ||||
|  | @ -100,7 +101,7 @@ func ProvideUnifiedStorageGrpcService( | |||
| 	} | ||||
| 
 | ||||
| 	// This will be used when running as a dskit service
 | ||||
| 	s.BasicService = services.NewBasicService(s.start, s.running, nil).WithName(modules.StorageServer) | ||||
| 	s.BasicService = services.NewBasicService(s.start, s.running, s.stopping).WithName(modules.StorageServer) | ||||
| 
 | ||||
| 	return s, nil | ||||
| } | ||||
|  | @ -258,3 +259,11 @@ func NewAuthenticatorWithFallback(cfg *setting.Cfg, reg prometheus.Registerer, t | |||
| 		return a.Authenticate(ctx) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (s *service) stopping(err error) error { | ||||
| 	if err != nil && !errors.Is(err, context.Canceled) { | ||||
| 		s.log.Error("stopping unified storage grpc service", "error", err) | ||||
| 		return err | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue