mirror of https://github.com/grafana/grafana.git
Plugins: Move store init to dskit service (#111206)
This commit is contained in:
parent
0a06183d84
commit
4cff7237d0
|
@ -12,19 +12,18 @@ import (
|
|||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana/pkg/plugins/auth"
|
||||
|
||||
"github.com/grafana/grafana/pkg/api/dtos"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/log/logtest"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/plugins/auth"
|
||||
"github.com/grafana/grafana/pkg/plugins/config"
|
||||
"github.com/grafana/grafana/pkg/plugins/manager/fakes"
|
||||
"github.com/grafana/grafana/pkg/plugins/manager/filestore"
|
||||
|
@ -528,9 +527,12 @@ func callGetPluginAsset(sc *scenarioContext) {
|
|||
func pluginAssetScenario(t *testing.T, desc string, url string, urlPattern string,
|
||||
cfg *setting.Cfg, pluginRegistry registry.Service, fn scenarioFunc) {
|
||||
t.Run(fmt.Sprintf("%s %s", desc, url), func(t *testing.T) {
|
||||
store, err := pluginstore.NewPluginStoreForTest(pluginRegistry, &fakes.FakeLoader{}, &fakes.FakeSourceRegistry{})
|
||||
require.NoError(t, err)
|
||||
|
||||
hs := HTTPServer{
|
||||
Cfg: cfg,
|
||||
pluginStore: pluginstore.New(pluginRegistry, &fakes.FakeLoader{}),
|
||||
pluginStore: store,
|
||||
pluginFileStore: filestore.ProvideService(pluginRegistry),
|
||||
log: log.NewNopLogger(),
|
||||
pluginsCDNService: pluginscdn.ProvideService(&config.PluginManagementCfg{
|
||||
|
@ -640,12 +642,14 @@ func Test_PluginsList_AccessControl(t *testing.T) {
|
|||
for _, tc := range tcs {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
server := SetupAPITestServer(t, func(hs *HTTPServer) {
|
||||
store, err := pluginstore.NewPluginStoreForTest(pluginRegistry, &fakes.FakeLoader{}, &fakes.FakeSourceRegistry{})
|
||||
require.NoError(t, err)
|
||||
|
||||
hs.Cfg = setting.NewCfg()
|
||||
hs.PluginSettings = &pluginSettings
|
||||
hs.pluginStore = pluginstore.New(pluginRegistry, &fakes.FakeLoader{})
|
||||
hs.pluginStore = store
|
||||
hs.pluginFileStore = filestore.ProvideService(pluginRegistry)
|
||||
hs.managedPluginsService = managedplugins.NewNoop()
|
||||
var err error
|
||||
hs.pluginsUpdateChecker, err = updatemanager.ProvidePluginsService(
|
||||
hs.Cfg,
|
||||
hs.pluginStore,
|
||||
|
@ -828,9 +832,12 @@ func Test_PluginsSettings(t *testing.T) {
|
|||
for _, tc := range tcs {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
server := SetupAPITestServer(t, func(hs *HTTPServer) {
|
||||
store, err := pluginstore.NewPluginStoreForTest(pluginRegistry, &fakes.FakeLoader{}, &fakes.FakeSourceRegistry{})
|
||||
require.NoError(t, err)
|
||||
|
||||
hs.Cfg = setting.NewCfg()
|
||||
hs.PluginSettings = &pluginSettings
|
||||
hs.pluginStore = pluginstore.New(pluginRegistry, &fakes.FakeLoader{})
|
||||
hs.pluginStore = store
|
||||
hs.pluginFileStore = filestore.ProvideService(pluginRegistry)
|
||||
errTracker := pluginerrs.ProvideErrorTracker()
|
||||
if tc.errCode != "" {
|
||||
|
@ -844,7 +851,6 @@ func Test_PluginsSettings(t *testing.T) {
|
|||
sig := signature.ProvideService(pCfg, statickey.New())
|
||||
hs.pluginAssets = pluginassets.ProvideService(pCfg, pluginCDN, sig, hs.pluginStore)
|
||||
hs.pluginErrorResolver = pluginerrs.ProvideStore(errTracker)
|
||||
var err error
|
||||
hs.pluginsUpdateChecker, err = updatemanager.ProvidePluginsService(
|
||||
hs.Cfg,
|
||||
hs.pluginStore,
|
||||
|
@ -896,9 +902,12 @@ func Test_UpdatePluginSetting(t *testing.T) {
|
|||
|
||||
t.Run("should return an error when trying to disable an auto-enabled plugin", func(t *testing.T) {
|
||||
server := SetupAPITestServer(t, func(hs *HTTPServer) {
|
||||
store, err := pluginstore.NewPluginStoreForTest(pluginRegistry, &fakes.FakeLoader{}, &fakes.FakeSourceRegistry{})
|
||||
require.NoError(t, err)
|
||||
|
||||
hs.Cfg = setting.NewCfg()
|
||||
hs.PluginSettings = &pluginSettings
|
||||
hs.pluginStore = pluginstore.New(pluginRegistry, &fakes.FakeLoader{})
|
||||
hs.pluginStore = store
|
||||
hs.pluginFileStore = filestore.ProvideService(pluginRegistry)
|
||||
hs.managedPluginsService = managedplugins.NewNoop()
|
||||
hs.log = log.NewNopLogger()
|
||||
|
|
|
@ -3,7 +3,9 @@ package datasource
|
|||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
@ -20,14 +22,16 @@ import (
|
|||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
datasource "github.com/grafana/grafana/pkg/apis/datasource/v0alpha1"
|
||||
query "github.com/grafana/grafana/pkg/apis/query/v0alpha1"
|
||||
"github.com/grafana/grafana/pkg/configprovider"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/plugins/manager/sources"
|
||||
"github.com/grafana/grafana/pkg/promlib/models"
|
||||
"github.com/grafana/grafana/pkg/registry/apis/query/queryschema"
|
||||
"github.com/grafana/grafana/pkg/services/accesscontrol"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/builder"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/pluginsintegration/pluginstore"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/tsdb/grafana-testdata-datasource/kinds"
|
||||
)
|
||||
|
||||
|
@ -47,12 +51,12 @@ type DataSourceAPIBuilder struct {
|
|||
}
|
||||
|
||||
func RegisterAPIService(
|
||||
cfgProvider configprovider.ConfigProvider,
|
||||
features featuremgmt.FeatureToggles,
|
||||
apiRegistrar builder.APIRegistrar,
|
||||
pluginClient plugins.Client, // access to everything
|
||||
datasources ScopedPluginDatasourceProvider,
|
||||
contextProvider PluginContextWrapper,
|
||||
pluginStore pluginstore.Store,
|
||||
accessControl accesscontrol.AccessControl,
|
||||
reg prometheus.Registerer,
|
||||
) (*DataSourceAPIBuilder, error) {
|
||||
|
@ -66,25 +70,43 @@ func RegisterAPIService(
|
|||
|
||||
var err error
|
||||
var builder *DataSourceAPIBuilder
|
||||
all := pluginStore.Plugins(context.Background(), plugins.TypeDataSource)
|
||||
|
||||
cfg, err := cfgProvider.Get(context.Background())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pluginJSONs, err := getCorePlugins(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ids := []string{
|
||||
"grafana-testdata-datasource",
|
||||
"prometheus",
|
||||
"graphite",
|
||||
}
|
||||
|
||||
for _, ds := range all {
|
||||
if explictPluginList && !slices.Contains(ids, ds.ID) {
|
||||
for _, pluginJSON := range pluginJSONs {
|
||||
if explictPluginList && !slices.Contains(ids, pluginJSON.ID) {
|
||||
continue // skip this one
|
||||
}
|
||||
|
||||
if !ds.Backend {
|
||||
if !pluginJSON.Backend {
|
||||
continue // skip frontend only plugins
|
||||
}
|
||||
|
||||
builder, err = NewDataSourceAPIBuilder(ds.JSONData,
|
||||
pluginClient,
|
||||
datasources.GetDatasourceProvider(ds.JSONData),
|
||||
if pluginJSON.Type != plugins.TypeDataSource {
|
||||
continue // skip non-datasource plugins
|
||||
}
|
||||
|
||||
client, ok := pluginClient.(PluginClient)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("plugin client is not a PluginClient: %T", pluginClient)
|
||||
}
|
||||
|
||||
builder, err = NewDataSourceAPIBuilder(pluginJSON,
|
||||
client,
|
||||
datasources.GetDatasourceProvider(pluginJSON),
|
||||
contextProvider,
|
||||
accessControl,
|
||||
features.IsEnabledGlobally(featuremgmt.FlagDatasourceQueryTypes),
|
||||
|
@ -277,3 +299,22 @@ func (b *DataSourceAPIBuilder) PostProcessOpenAPI(oas *spec3.OpenAPI) (*spec3.Op
|
|||
|
||||
return oas, err
|
||||
}
|
||||
|
||||
func getCorePlugins(cfg *setting.Cfg) ([]plugins.JSONData, error) {
|
||||
coreDataSourcesPath := filepath.Join(cfg.StaticRootPath, "app", "plugins", "datasource")
|
||||
coreDataSourcesSrc := sources.NewLocalSource(
|
||||
plugins.ClassCore,
|
||||
[]string{coreDataSourcesPath},
|
||||
)
|
||||
|
||||
res, err := coreDataSourcesSrc.Discover(context.Background())
|
||||
if err != nil {
|
||||
return nil, errors.New("failed to load core data source plugins")
|
||||
}
|
||||
|
||||
pluginJSONs := make([]plugins.JSONData, 0, len(res))
|
||||
for _, p := range res {
|
||||
pluginJSONs = append(pluginJSONs, p.Primary.JSONData)
|
||||
}
|
||||
return pluginJSONs, nil
|
||||
}
|
||||
|
|
|
@ -3,9 +3,13 @@ package adapter
|
|||
import (
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/modules"
|
||||
"github.com/grafana/grafana/pkg/services/pluginsintegration/pluginstore"
|
||||
)
|
||||
|
||||
const (
|
||||
// PluginStore is the module name for the plugin store service.
|
||||
PluginStore = pluginstore.ServiceName
|
||||
|
||||
// Tracing is the module name for the tracing service.
|
||||
Tracing = tracing.ServiceName
|
||||
|
||||
|
@ -29,7 +33,8 @@ func dependencyMap() map[string][]string {
|
|||
return map[string][]string{
|
||||
Tracing: {},
|
||||
GrafanaAPIServer: {Tracing},
|
||||
Core: {GrafanaAPIServer},
|
||||
PluginStore: {GrafanaAPIServer},
|
||||
Core: {GrafanaAPIServer, PluginStore},
|
||||
BackgroundServices: {Core},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -547,10 +547,7 @@ func Initialize(ctx context.Context, cfg *setting.Cfg, opts Options, apiOpts api
|
|||
}
|
||||
errorRegistry := pluginerrs.ProvideErrorTracker()
|
||||
loaderLoader := loader.ProvideService(pluginManagementCfg, discovery, bootstrap, validate, initialize, terminate, errorRegistry)
|
||||
pluginstoreService, err := pluginstore.ProvideService(inMemory, sourcesService, loaderLoader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pluginstoreService := pluginstore.ProvideService(inMemory, sourcesService, loaderLoader)
|
||||
filestoreService := filestore.ProvideService(inMemory)
|
||||
fileStoreManager := dashboards.ProvideFileStoreManager(pluginstoreService, filestoreService)
|
||||
folderPermissionsService, err := ossaccesscontrol.ProvideFolderPermissions(cfg, featureToggles, routeRegisterImpl, sqlStore, accessControl, ossLicensingService, folderimplService, acimplService, teamService, userService, actionSetService)
|
||||
|
@ -810,7 +807,7 @@ func Initialize(ctx context.Context, cfg *setting.Cfg, opts Options, apiOpts api
|
|||
apiService := api4.ProvideService(cfg, routeRegisterImpl, accessControl, userService, authinfoimplService, ossGroups, identitySynchronizer, orgService, ldapImpl, userAuthTokenService, bundleregistryService)
|
||||
dashboardsAPIBuilder := dashboard.RegisterAPIService(cfg, featureToggles, apiserverService, dashboardService, dashboardProvisioningService, service15, dashboardServiceImpl, dashboardPermissionsService, accessControl, accessClient, provisioningServiceImpl, dashboardsStore, registerer, sqlStore, tracingService, resourceClient, dualwriteService, sortService, quotaService, libraryPanelService, eventualRestConfigProvider, userService)
|
||||
snapshotsAPIBuilder := dashboardsnapshot.RegisterAPIService(serviceImpl, apiserverService, cfg, featureToggles, sqlStore, registerer)
|
||||
dataSourceAPIBuilder, err := datasource.RegisterAPIService(featureToggles, apiserverService, middlewareHandler, scopedPluginDatasourceProvider, plugincontextProvider, pluginstoreService, accessControl, registerer)
|
||||
dataSourceAPIBuilder, err := datasource.RegisterAPIService(configProvider, featureToggles, apiserverService, middlewareHandler, scopedPluginDatasourceProvider, plugincontextProvider, accessControl, registerer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1152,10 +1149,7 @@ func InitializeForTest(ctx context.Context, t sqlutil.ITestDB, testingT interfac
|
|||
}
|
||||
errorRegistry := pluginerrs.ProvideErrorTracker()
|
||||
loaderLoader := loader.ProvideService(pluginManagementCfg, discovery, bootstrap, validate, initialize, terminate, errorRegistry)
|
||||
pluginstoreService, err := pluginstore.ProvideService(inMemory, sourcesService, loaderLoader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pluginstoreService := pluginstore.ProvideService(inMemory, sourcesService, loaderLoader)
|
||||
filestoreService := filestore.ProvideService(inMemory)
|
||||
fileStoreManager := dashboards.ProvideFileStoreManager(pluginstoreService, filestoreService)
|
||||
folderPermissionsService, err := ossaccesscontrol.ProvideFolderPermissions(cfg, featureToggles, routeRegisterImpl, sqlStore, accessControl, ossLicensingService, folderimplService, acimplService, teamService, userService, actionSetService)
|
||||
|
@ -1417,7 +1411,7 @@ func InitializeForTest(ctx context.Context, t sqlutil.ITestDB, testingT interfac
|
|||
apiService := api4.ProvideService(cfg, routeRegisterImpl, accessControl, userService, authinfoimplService, ossGroups, identitySynchronizer, orgService, ldapImpl, userAuthTokenService, bundleregistryService)
|
||||
dashboardsAPIBuilder := dashboard.RegisterAPIService(cfg, featureToggles, apiserverService, dashboardService, dashboardProvisioningService, service15, dashboardServiceImpl, dashboardPermissionsService, accessControl, accessClient, provisioningServiceImpl, dashboardsStore, registerer, sqlStore, tracingService, resourceClient, dualwriteService, sortService, quotaService, libraryPanelService, eventualRestConfigProvider, userService)
|
||||
snapshotsAPIBuilder := dashboardsnapshot.RegisterAPIService(serviceImpl, apiserverService, cfg, featureToggles, sqlStore, registerer)
|
||||
dataSourceAPIBuilder, err := datasource.RegisterAPIService(featureToggles, apiserverService, middlewareHandler, scopedPluginDatasourceProvider, plugincontextProvider, pluginstoreService, accessControl, registerer)
|
||||
dataSourceAPIBuilder, err := datasource.RegisterAPIService(configProvider, featureToggles, apiserverService, middlewareHandler, scopedPluginDatasourceProvider, plugincontextProvider, accessControl, registerer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -41,8 +41,10 @@ func TestGet(t *testing.T) {
|
|||
cfg := setting.NewCfg()
|
||||
ds := &fakeDatasources.FakeDataSourceService{}
|
||||
db := &dbtest.FakeDB{ExpectedError: pluginsettings.ErrPluginSettingNotFound}
|
||||
store, err := pluginstore.NewPluginStoreForTest(preg, &pluginFakes.FakeLoader{}, &pluginFakes.FakeSourceRegistry{})
|
||||
require.NoError(t, err)
|
||||
pcp := plugincontext.ProvideService(cfg, localcache.ProvideService(),
|
||||
pluginstore.New(preg, &pluginFakes.FakeLoader{}), &fakeDatasources.FakeCacheService{},
|
||||
store, &fakeDatasources.FakeCacheService{},
|
||||
ds, pluginSettings.ProvideService(db, secretstest.NewFakeSecretsService()), pluginconfig.NewFakePluginRequestConfigProvider(),
|
||||
)
|
||||
identity := &user.SignedInUser{OrgID: int64(1), Login: "admin"}
|
||||
|
|
|
@ -26,7 +26,7 @@ func TestService_IsDisabled(t *testing.T) {
|
|||
&setting.Cfg{
|
||||
PreinstallPluginsAsync: []setting.InstallPlugin{{ID: "myplugin"}},
|
||||
},
|
||||
pluginstore.New(registry.NewInMemory(), &fakes.FakeLoader{}),
|
||||
pluginstore.New(registry.NewInMemory(), &fakes.FakeLoader{}, &fakes.FakeSourceRegistry{}),
|
||||
&fakes.FakePluginInstaller{},
|
||||
prometheus.NewRegistry(),
|
||||
&fakes.FakePluginRepo{},
|
||||
|
@ -160,12 +160,14 @@ func TestService_Run(t *testing.T) {
|
|||
}
|
||||
installed := 0
|
||||
installedFromURL := 0
|
||||
store, err := pluginstore.NewPluginStoreForTest(preg, &fakes.FakeLoader{}, &fakes.FakeSourceRegistry{})
|
||||
require.NoError(t, err)
|
||||
s, err := ProvideService(
|
||||
&setting.Cfg{
|
||||
PreinstallPluginsAsync: tt.pluginsToInstall,
|
||||
PreinstallPluginsSync: tt.pluginsToInstallSync,
|
||||
},
|
||||
pluginstore.New(preg, &fakes.FakeLoader{}),
|
||||
store,
|
||||
&fakes.FakePluginInstaller{
|
||||
AddFunc: func(ctx context.Context, pluginID string, version string, opts plugins.AddOpts) error {
|
||||
for _, plugin := range tt.pluginsToFail {
|
||||
|
|
|
@ -3,18 +3,21 @@ package pluginstore
|
|||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/dskit/services"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/plugins/manager/loader"
|
||||
"github.com/grafana/grafana/pkg/plugins/manager/registry"
|
||||
"github.com/grafana/grafana/pkg/plugins/manager/sources"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var _ Store = (*Service)(nil)
|
||||
|
||||
const ServiceName = "plugins.store"
|
||||
|
||||
// Store is the publicly accessible storage for plugins.
|
||||
type Store interface {
|
||||
// Plugin finds a plugin by its ID.
|
||||
|
@ -25,47 +28,80 @@ type Store interface {
|
|||
}
|
||||
|
||||
type Service struct {
|
||||
services.NamedService
|
||||
|
||||
pluginRegistry registry.Service
|
||||
pluginLoader loader.Service
|
||||
pluginSources sources.Registry
|
||||
}
|
||||
|
||||
func ProvideService(pluginRegistry registry.Service, pluginSources sources.Registry,
|
||||
pluginLoader loader.Service) (*Service, error) {
|
||||
ctx := context.Background()
|
||||
start := time.Now()
|
||||
totalPlugins := 0
|
||||
logger := log.New("plugin.store")
|
||||
logger.Info("Loading plugins...")
|
||||
pluginLoader loader.Service) *Service {
|
||||
return New(pluginRegistry, pluginLoader, pluginSources)
|
||||
}
|
||||
|
||||
for _, ps := range pluginSources.List(ctx) {
|
||||
loadedPlugins, err := pluginLoader.Load(ctx, ps)
|
||||
if err != nil {
|
||||
logger.Error("Loading plugin source failed", "source", ps.PluginClass(ctx), "error", err)
|
||||
func (s *Service) Run(ctx context.Context) error {
|
||||
if err := s.StartAsync(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
return s.AwaitTerminated(ctx)
|
||||
}
|
||||
|
||||
func NewPluginStoreForTest(pluginRegistry registry.Service, pluginLoader loader.Service, pluginSources sources.Registry) (*Service, error) {
|
||||
s := New(pluginRegistry, pluginLoader, pluginSources)
|
||||
if err := s.StartAsync(context.Background()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := s.AwaitRunning(context.Background()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func New(pluginRegistry registry.Service, pluginLoader loader.Service, pluginSources sources.Registry) *Service {
|
||||
s := &Service{
|
||||
pluginRegistry: pluginRegistry,
|
||||
pluginLoader: pluginLoader,
|
||||
pluginSources: pluginSources,
|
||||
}
|
||||
s.NamedService = services.NewBasicService(s.starting, s.running, s.stopping).WithName(ServiceName)
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *Service) starting(ctx context.Context) error {
|
||||
start := time.Now()
|
||||
totalPlugins := 0
|
||||
logger := log.New(ServiceName)
|
||||
logger.Info("Loading plugins...")
|
||||
|
||||
for _, ps := range s.pluginSources.List(ctx) {
|
||||
loadedPlugins, err := s.pluginLoader.Load(ctx, ps)
|
||||
if err != nil {
|
||||
logger.Error("Loading plugin source failed", "source", ps.PluginClass(ctx), "error", err)
|
||||
return err
|
||||
}
|
||||
totalPlugins += len(loadedPlugins)
|
||||
}
|
||||
|
||||
logger.Info("Plugins loaded", "count", totalPlugins, "duration", time.Since(start))
|
||||
|
||||
return New(pluginRegistry, pluginLoader), nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) Run(ctx context.Context) error {
|
||||
func (s *Service) running(ctx context.Context) error {
|
||||
<-ctx.Done()
|
||||
s.shutdown(ctx)
|
||||
return ctx.Err()
|
||||
return nil
|
||||
}
|
||||
|
||||
func New(pluginRegistry registry.Service, pluginLoader loader.Service) *Service {
|
||||
return &Service{
|
||||
pluginRegistry: pluginRegistry,
|
||||
pluginLoader: pluginLoader,
|
||||
}
|
||||
func (s *Service) stopping(failureReason error) error {
|
||||
return s.shutdown(context.Background())
|
||||
}
|
||||
|
||||
func (s *Service) Plugin(ctx context.Context, pluginID string) (Plugin, bool) {
|
||||
if err := s.AwaitRunning(ctx); err != nil {
|
||||
log.New(ServiceName).FromContext(ctx).Error("Failed to get plugin", "error", err)
|
||||
return Plugin{}, false
|
||||
}
|
||||
p, exists := s.plugin(ctx, pluginID)
|
||||
if !exists {
|
||||
return Plugin{}, false
|
||||
|
@ -75,6 +111,10 @@ func (s *Service) Plugin(ctx context.Context, pluginID string) (Plugin, bool) {
|
|||
}
|
||||
|
||||
func (s *Service) Plugins(ctx context.Context, pluginTypes ...plugins.Type) []Plugin {
|
||||
if err := s.AwaitRunning(ctx); err != nil {
|
||||
log.New(ServiceName).FromContext(ctx).Error("Failed to get plugins", "error", err)
|
||||
return []Plugin{}
|
||||
}
|
||||
// if no types passed, assume all
|
||||
if len(pluginTypes) == 0 {
|
||||
pluginTypes = plugins.PluginTypes
|
||||
|
@ -125,6 +165,10 @@ func (s *Service) availablePlugins(ctx context.Context) []*plugins.Plugin {
|
|||
}
|
||||
|
||||
func (s *Service) Routes(ctx context.Context) []*plugins.StaticRoute {
|
||||
if err := s.AwaitRunning(ctx); err != nil {
|
||||
log.New(ServiceName).FromContext(ctx).Error("Failed to get routes", "error", err)
|
||||
return []*plugins.StaticRoute{}
|
||||
}
|
||||
staticRoutes := make([]*plugins.StaticRoute, 0)
|
||||
|
||||
for _, p := range s.availablePlugins(ctx) {
|
||||
|
@ -135,18 +179,20 @@ func (s *Service) Routes(ctx context.Context) []*plugins.StaticRoute {
|
|||
return staticRoutes
|
||||
}
|
||||
|
||||
func (s *Service) shutdown(ctx context.Context) {
|
||||
var wg sync.WaitGroup
|
||||
for _, plugin := range s.pluginRegistry.Plugins(ctx) {
|
||||
wg.Add(1)
|
||||
go func(ctx context.Context, p *plugins.Plugin) {
|
||||
defer wg.Done()
|
||||
p.Logger().Debug("Stopping plugin")
|
||||
if _, err := s.pluginLoader.Unload(ctx, p); err != nil {
|
||||
p.Logger().Error("Failed to stop plugin", "error", err)
|
||||
func (s *Service) shutdown(ctx context.Context) error {
|
||||
var errgroup errgroup.Group
|
||||
plugins := s.pluginRegistry.Plugins(ctx)
|
||||
for _, p := range plugins {
|
||||
plugin := p // capture loop variable
|
||||
errgroup.Go(func() error {
|
||||
plugin.Logger().Debug("Stopping plugin")
|
||||
if _, err := s.pluginLoader.Unload(ctx, plugin); err != nil {
|
||||
plugin.Logger().Error("Failed to stop plugin", "error", err)
|
||||
return err
|
||||
}
|
||||
p.Logger().Debug("Plugin stopped")
|
||||
}(ctx, plugin)
|
||||
plugin.Logger().Debug("Plugin stopped")
|
||||
return nil
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
return errgroup.Wait()
|
||||
}
|
||||
|
|
|
@ -2,7 +2,7 @@ package pluginstore
|
|||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -43,7 +43,11 @@ func TestStore_ProvideService(t *testing.T) {
|
|||
}
|
||||
}}
|
||||
|
||||
_, err := ProvideService(fakes.NewFakePluginRegistry(), srcs, l)
|
||||
service := ProvideService(fakes.NewFakePluginRegistry(), srcs, l)
|
||||
ctx := context.Background()
|
||||
err := service.StartAsync(ctx)
|
||||
require.NoError(t, err)
|
||||
err = service.AwaitRunning(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []plugins.Class{"1", "2", "3"}, loadedSrcs)
|
||||
})
|
||||
|
@ -55,12 +59,13 @@ func TestStore_Plugin(t *testing.T) {
|
|||
p1.RegisterClient(&DecommissionedPlugin{})
|
||||
p2 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "test-panel"}}
|
||||
|
||||
ps := New(&fakes.FakePluginRegistry{
|
||||
ps, err := NewPluginStoreForTest(&fakes.FakePluginRegistry{
|
||||
Store: map[string]*plugins.Plugin{
|
||||
p1.ID: p1,
|
||||
p2.ID: p2,
|
||||
},
|
||||
}, &fakes.FakeLoader{})
|
||||
}, &fakes.FakeLoader{}, &fakes.FakeSourceRegistry{})
|
||||
require.NoError(t, err)
|
||||
|
||||
p, exists := ps.Plugin(context.Background(), p1.ID)
|
||||
require.False(t, exists)
|
||||
|
@ -81,7 +86,7 @@ func TestStore_Plugins(t *testing.T) {
|
|||
p5 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "e-test-panel", Type: plugins.TypePanel}}
|
||||
p5.RegisterClient(&DecommissionedPlugin{})
|
||||
|
||||
ps := New(&fakes.FakePluginRegistry{
|
||||
ps, err := NewPluginStoreForTest(&fakes.FakePluginRegistry{
|
||||
Store: map[string]*plugins.Plugin{
|
||||
p1.ID: p1,
|
||||
p2.ID: p2,
|
||||
|
@ -89,7 +94,8 @@ func TestStore_Plugins(t *testing.T) {
|
|||
p4.ID: p4,
|
||||
p5.ID: p5,
|
||||
},
|
||||
}, &fakes.FakeLoader{})
|
||||
}, &fakes.FakeLoader{}, &fakes.FakeSourceRegistry{})
|
||||
require.NoError(t, err)
|
||||
|
||||
ToGrafanaDTO(p1)
|
||||
pss := ps.Plugins(context.Background())
|
||||
|
@ -124,7 +130,7 @@ func TestStore_Routes(t *testing.T) {
|
|||
p6 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "f-test-app", Type: plugins.TypeApp}}
|
||||
p6.RegisterClient(&DecommissionedPlugin{})
|
||||
|
||||
ps := New(&fakes.FakePluginRegistry{
|
||||
ps, err := NewPluginStoreForTest(&fakes.FakePluginRegistry{
|
||||
Store: map[string]*plugins.Plugin{
|
||||
p1.ID: p1,
|
||||
p2.ID: p2,
|
||||
|
@ -132,7 +138,8 @@ func TestStore_Routes(t *testing.T) {
|
|||
p5.ID: p5,
|
||||
p6.ID: p6,
|
||||
},
|
||||
}, &fakes.FakeLoader{})
|
||||
}, &fakes.FakeLoader{}, &fakes.FakeSourceRegistry{})
|
||||
require.NoError(t, err)
|
||||
|
||||
sr := func(p *plugins.Plugin) *plugins.StaticRoute {
|
||||
return &plugins.StaticRoute{PluginID: p.ID, Directory: p.FS.Base()}
|
||||
|
@ -144,6 +151,7 @@ func TestStore_Routes(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestProcessManager_shutdown(t *testing.T) {
|
||||
t.Run("When context is cancelled the plugin is stopped", func(t *testing.T) {
|
||||
p := &plugins.Plugin{JSONData: plugins.JSONData{ID: "test-datasource", Type: plugins.TypeDataSource}} // Backend: true
|
||||
backend := &fakes.FakeBackendPlugin{}
|
||||
p.RegisterClient(backend)
|
||||
|
@ -160,23 +168,45 @@ func TestProcessManager_shutdown(t *testing.T) {
|
|||
unloaded = true
|
||||
return nil, nil
|
||||
},
|
||||
}, &fakes.FakeSourceRegistry{})
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
err := ps.StartAsync(ctx)
|
||||
require.NoError(t, err)
|
||||
err = ps.AwaitRunning(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Cancel context to trigger shutdown
|
||||
cancel()
|
||||
|
||||
// Wait for service to be fully terminated
|
||||
err = ps.AwaitTerminated(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.True(t, unloaded)
|
||||
})
|
||||
|
||||
pCtx := context.Background()
|
||||
cCtx, cancel := context.WithCancel(pCtx)
|
||||
var wgRun sync.WaitGroup
|
||||
wgRun.Add(1)
|
||||
var runErr error
|
||||
go func() {
|
||||
runErr = ps.Run(cCtx)
|
||||
wgRun.Done()
|
||||
}()
|
||||
t.Run("When shutdown fails, stopping method returns error", func(t *testing.T) {
|
||||
p := &plugins.Plugin{JSONData: plugins.JSONData{ID: "test-datasource", Type: plugins.TypeDataSource}}
|
||||
backend := &fakes.FakeBackendPlugin{}
|
||||
p.RegisterClient(backend)
|
||||
p.SetLogger(log.NewTestLogger())
|
||||
|
||||
t.Run("When context is cancelled the plugin is stopped", func(t *testing.T) {
|
||||
cancel()
|
||||
wgRun.Wait()
|
||||
require.ErrorIs(t, runErr, context.Canceled)
|
||||
require.True(t, unloaded)
|
||||
expectedErr := errors.New("unload failed")
|
||||
ps, err := NewPluginStoreForTest(&fakes.FakePluginRegistry{
|
||||
Store: map[string]*plugins.Plugin{
|
||||
p.ID: p,
|
||||
},
|
||||
}, &fakes.FakeLoader{
|
||||
UnloadFunc: func(_ context.Context, plugin *plugins.Plugin) (*plugins.Plugin, error) {
|
||||
return nil, expectedErr
|
||||
},
|
||||
}, &fakes.FakeSourceRegistry{})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = ps.stopping(nil)
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, err, expectedErr)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -186,12 +216,13 @@ func TestStore_availablePlugins(t *testing.T) {
|
|||
p1.RegisterClient(&DecommissionedPlugin{})
|
||||
p2 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "test-app"}}
|
||||
|
||||
ps := New(&fakes.FakePluginRegistry{
|
||||
ps, err := NewPluginStoreForTest(&fakes.FakePluginRegistry{
|
||||
Store: map[string]*plugins.Plugin{
|
||||
p1.ID: p1,
|
||||
p2.ID: p2,
|
||||
},
|
||||
}, &fakes.FakeLoader{})
|
||||
}, &fakes.FakeLoader{}, &fakes.FakeSourceRegistry{})
|
||||
require.NoError(t, err)
|
||||
|
||||
aps := ps.availablePlugins(context.Background())
|
||||
require.Len(t, aps, 1)
|
||||
|
|
|
@ -67,7 +67,7 @@ func CreateIntegrationTestCtx(t *testing.T, cfg *setting.Cfg, coreRegistry *core
|
|||
Terminator: term,
|
||||
})
|
||||
|
||||
ps, err := pluginstore.ProvideService(reg, sources.ProvideService(cfg, pCfg), l)
|
||||
ps, err := pluginstore.NewPluginStoreForTest(reg, l, sources.ProvideService(cfg, pCfg))
|
||||
require.NoError(t, err)
|
||||
|
||||
return &IntegrationTestCtx{
|
||||
|
|
Loading…
Reference in New Issue