grafana/pkg/operators/provisioning/config.go

373 lines
13 KiB
Go

package provisioning
import (
"context"
"crypto/x509"
"fmt"
"net/http"
"os"
"time"
"github.com/grafana/authlib/authn"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/client-go/rest"
"k8s.io/client-go/transport"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/apiserver"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified"
"github.com/grafana/grafana/pkg/storage/unified/resource"
provisioning "github.com/grafana/grafana/apps/provisioning/pkg/apis/provisioning/v0alpha1"
authrt "github.com/grafana/grafana/apps/provisioning/pkg/auth"
client "github.com/grafana/grafana/apps/provisioning/pkg/generated/clientset/versioned"
"github.com/grafana/grafana/apps/provisioning/pkg/repository"
"github.com/grafana/grafana/apps/provisioning/pkg/repository/git"
"github.com/grafana/grafana/apps/provisioning/pkg/repository/github"
"github.com/grafana/grafana/apps/provisioning/pkg/repository/local"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/resources"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/webhooks"
secretdecrypt "github.com/grafana/grafana/pkg/registry/apis/secret/decrypt"
)
// provisioningControllerConfig contains the configuration that overlaps for the jobs and repo controllers
type provisioningControllerConfig struct {
provisioningClient *client.Clientset
resyncInterval time.Duration
repoFactory repository.Factory
unified resources.ResourceStore
clients resources.ClientFactory
}
// expects:
// [grpc_client_authentication]
// token =
// token_exchange_url =
// [secrets_manager]
// grpc_server_address =
// grpc_server_tls_server_name =
// grpc_server_use_tls =
// grpc_server_tls_ca_file =
// grpc_server_tls_skip_verify =
// [unified_storage]
// grpc_address =
// grpc_index_address =
// allow_insecure =
// audiences =
// [operator]
// provisioning_server_url =
// provisioning_server_public_url =
// dashboards_server_url =
// folders_server_url =
// tls_insecure =
// tls_cert_file =
// tls_key_file =
// tls_ca_file =
// resync_interval =
// home_path =
// local_permitted_prefixes =
// [provisioning]
// repository_types =
func setupFromConfig(cfg *setting.Cfg, registry prometheus.Registerer) (controllerCfg *provisioningControllerConfig, err error) {
if cfg == nil {
return nil, fmt.Errorf("no configuration available")
}
// TODO: we should setup tracing properly
// https://github.com/grafana/git-ui-sync-project/issues/507
tracer := tracing.NewNoopTracerService()
gRPCAuth := cfg.SectionWithEnvOverrides("grpc_client_authentication")
token := gRPCAuth.Key("token").String()
if token == "" {
return nil, fmt.Errorf("token is required in [grpc_client_authentication] section")
}
tokenExchangeURL := gRPCAuth.Key("token_exchange_url").String()
if tokenExchangeURL == "" {
return nil, fmt.Errorf("token_exchange_url is required in [grpc_client_authentication] section")
}
operatorSec := cfg.SectionWithEnvOverrides("operator")
provisioningServerURL := operatorSec.Key("provisioning_server_url").String()
if provisioningServerURL == "" {
return nil, fmt.Errorf("provisioning_server_url is required in [operator] section")
}
tlsInsecure := operatorSec.Key("tls_insecure").MustBool(false)
tlsCertFile := operatorSec.Key("tls_cert_file").String()
tlsKeyFile := operatorSec.Key("tls_key_file").String()
tlsCAFile := operatorSec.Key("tls_ca_file").String()
tokenExchangeClient, err := authn.NewTokenExchangeClient(authn.TokenExchangeConfig{
TokenExchangeURL: tokenExchangeURL,
Token: token,
})
if err != nil {
return nil, fmt.Errorf("failed to create token exchange client: %w", err)
}
tlsConfig, err := buildTLSConfig(tlsInsecure, tlsCertFile, tlsKeyFile, tlsCAFile)
if err != nil {
return nil, fmt.Errorf("failed to build TLS configuration: %w", err)
}
config := &rest.Config{
APIPath: "/apis",
Host: provisioningServerURL,
WrapTransport: transport.WrapperFunc(func(rt http.RoundTripper) http.RoundTripper {
return authrt.NewRoundTripper(tokenExchangeClient, rt, provisioning.GROUP)
}),
TLSClientConfig: tlsConfig,
}
provisioningClient, err := client.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create provisioning client: %w", err)
}
decrypter, err := setupDecrypter(cfg, tracer, tokenExchangeClient)
if err != nil {
return nil, fmt.Errorf("failed to setup decrypter: %w", err)
}
repoFactory, err := setupRepoFactory(cfg, decrypter, provisioningClient, registry)
if err != nil {
return nil, fmt.Errorf("failed to setup repository getter: %w", err)
}
// HACK: This logic directly connects to unified storage. We are doing this for now as there is no global
// search endpoint. But controllers, in general, should not connect directly to unified storage and instead
// go through the api server. Once there is a global search endpoint, we will switch to that here as well.
resourceClientCfg := resource.RemoteResourceClientConfig{
Token: token,
TokenExchangeURL: tokenExchangeURL,
Namespace: gRPCAuth.Key("token_namespace").String(),
}
unified, err := setupUnifiedStorageClient(cfg, tracer, resourceClientCfg)
if err != nil {
return nil, fmt.Errorf("failed to setup unified storage: %w", err)
}
dashboardsServerURL := operatorSec.Key("dashboards_server_url").String()
if dashboardsServerURL == "" {
return nil, fmt.Errorf("dashboards_server_url is required in [operator] section")
}
foldersServerURL := operatorSec.Key("folders_server_url").String()
if foldersServerURL == "" {
return nil, fmt.Errorf("folders_server_url is required in [operator] section")
}
apiServerURLs := map[string]string{
resources.DashboardResource.Group: dashboardsServerURL,
resources.FolderResource.Group: foldersServerURL,
provisioning.GROUP: provisioningServerURL,
}
configProviders := make(map[string]apiserver.RestConfigProvider)
for group, url := range apiServerURLs {
config := &rest.Config{
APIPath: "/apis",
Host: url,
WrapTransport: transport.WrapperFunc(func(rt http.RoundTripper) http.RoundTripper {
return authrt.NewRoundTripper(tokenExchangeClient, rt, group)
}),
TLSClientConfig: tlsConfig,
}
configProviders[group] = NewDirectConfigProvider(config)
}
clients := resources.NewClientFactoryForMultipleAPIServers(configProviders)
return &provisioningControllerConfig{
provisioningClient: provisioningClient,
repoFactory: repoFactory,
unified: unified,
clients: clients,
resyncInterval: operatorSec.Key("resync_interval").MustDuration(60 * time.Second),
}, nil
}
func buildTLSConfig(insecure bool, certFile, keyFile, caFile string) (rest.TLSClientConfig, error) {
tlsConfig := rest.TLSClientConfig{
Insecure: insecure,
}
if certFile != "" && keyFile != "" {
tlsConfig.CertFile = certFile
tlsConfig.KeyFile = keyFile
}
if caFile != "" {
// caFile is set in operator.ini file
// nolint:gosec
caCert, err := os.ReadFile(caFile)
if err != nil {
return tlsConfig, fmt.Errorf("failed to read CA certificate file: %w", err)
}
caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(caCert) {
return tlsConfig, fmt.Errorf("failed to parse CA certificate")
}
tlsConfig.CAData = caCert
}
return tlsConfig, nil
}
func setupRepoFactory(
cfg *setting.Cfg,
decrypter repository.Decrypter,
provisioningClient *client.Clientset,
registry prometheus.Registerer,
) (repository.Factory, error) {
operatorSec := cfg.SectionWithEnvOverrides("operator")
provisioningSec := cfg.SectionWithEnvOverrides("provisioning")
repoTypes := provisioningSec.Key("repository_types").Strings("|")
if len(repoTypes) == 0 {
repoTypes = []string{"github"}
}
// TODO: This depends on the different flavor of Grafana
// https://github.com/grafana/git-ui-sync-project/issues/495
extras := make([]repository.Extra, 0)
alreadyRegistered := make(map[provisioning.RepositoryType]struct{})
for _, t := range repoTypes {
if _, ok := alreadyRegistered[provisioning.RepositoryType(t)]; ok {
continue
}
alreadyRegistered[provisioning.RepositoryType(t)] = struct{}{}
switch provisioning.RepositoryType(t) {
case provisioning.GitRepositoryType:
extras = append(extras, git.Extra(decrypter))
case provisioning.GitHubRepositoryType:
var webhook *webhooks.WebhookExtraBuilder
provisioningAppURL := operatorSec.Key("provisioning_server_public_url").String()
if provisioningAppURL != "" {
webhook = webhooks.ProvideWebhooks(provisioningAppURL, registry)
}
extras = append(extras, github.Extra(
decrypter,
github.ProvideFactory(),
webhook,
),
)
case provisioning.LocalRepositoryType:
homePath := operatorSec.Key("home_path").String()
if homePath == "" {
return nil, fmt.Errorf("home_path is required in [operator] section for local repository type")
}
permittedPrefixes := operatorSec.Key("local_permitted_prefixes").Strings("|")
if len(permittedPrefixes) == 0 {
return nil, fmt.Errorf("local_permitted_prefixes is required in [operator] section for local repository type")
}
extras = append(extras, local.Extra(
homePath,
permittedPrefixes,
))
default:
return nil, fmt.Errorf("unsupported repository type: %s", t)
}
}
repoFactory, err := repository.ProvideFactory(alreadyRegistered, extras)
if err != nil {
return nil, fmt.Errorf("create repository factory: %w", err)
}
return repoFactory, nil
}
func setupDecrypter(cfg *setting.Cfg, tracer tracing.Tracer, tokenExchangeClient *authn.TokenExchangeClient) (decrypter repository.Decrypter, err error) {
secretsSec := cfg.SectionWithEnvOverrides("secrets_manager")
if secretsSec == nil {
return nil, fmt.Errorf("no [secrets_manager] section found in config")
}
address := secretsSec.Key("grpc_server_address").String()
if address == "" {
return nil, fmt.Errorf("grpc_server_address is required in [secrets_manager] section")
}
secretsTls := secretdecrypt.TLSConfig{
UseTLS: secretsSec.Key("grpc_server_use_tls").MustBool(true),
CAFile: secretsSec.Key("grpc_server_tls_ca_file").String(),
ServerName: secretsSec.Key("grpc_server_tls_server_name").String(),
InsecureSkipVerify: secretsSec.Key("grpc_server_tls_skip_verify").MustBool(false),
}
decryptSvc, err := secretdecrypt.NewGRPCDecryptClientWithTLS(
tokenExchangeClient,
tracer,
address,
secretsTls,
)
if err != nil {
return nil, fmt.Errorf("create decrypt service: %w", err)
}
return repository.ProvideDecrypter(decryptSvc), nil
}
// HACK: This logic directly connects to unified storage. We are doing this for now as there is no global
// search endpoint. But controllers, in general, should not connect directly to unified storage and instead
// go through the api server. Once there is a global search endpoint, we will switch to that here as well.
func setupUnifiedStorageClient(cfg *setting.Cfg, tracer tracing.Tracer, resourceClientCfg resource.RemoteResourceClientConfig) (resources.ResourceStore, error) {
unifiedStorageSec := cfg.SectionWithEnvOverrides("unified_storage")
// Connect to Server
address := unifiedStorageSec.Key("grpc_address").String()
if address == "" {
return nil, fmt.Errorf("grpc_address is required in [unified_storage] section")
}
// FIXME: These metrics are not going to show up in /metrics
registry := prometheus.NewPedanticRegistry()
conn, err := unified.GrpcConn(address, registry)
if err != nil {
return nil, fmt.Errorf("create unified storage gRPC connection: %w", err)
}
// Connect to Index
indexConn := conn
indexAddress := unifiedStorageSec.Key("grpc_index_address").String()
if indexAddress != "" {
// FIXME: These metrics are not going to show up in /metrics. We will also need to wrap these metrics
// to start with something else so it doesn't collide with the storage api metrics.
registry2 := prometheus.NewPedanticRegistry()
indexConn, err = unified.GrpcConn(indexAddress, registry2)
if err != nil {
return nil, fmt.Errorf("create unified storage index gRPC connection: %w", err)
}
}
// Create client
resourceClientCfg.AllowInsecure = unifiedStorageSec.Key("allow_insecure").MustBool(false)
resourceClientCfg.Audiences = unifiedStorageSec.Key("audiences").Strings("|")
client, err := resource.NewRemoteResourceClient(tracer, conn, indexConn, resourceClientCfg)
if err != nil {
return nil, fmt.Errorf("create unified storage client: %w", err)
}
return client, nil
}
// directConfigProvider is a simple RestConfigProvider that always returns the same rest.Config
// it implements apiserver.RestConfigProvider
type directConfigProvider struct {
cfg *rest.Config
}
func NewDirectConfigProvider(cfg *rest.Config) apiserver.RestConfigProvider {
return &directConfigProvider{cfg: cfg}
}
func (r *directConfigProvider) GetRestConfig(ctx context.Context) (*rest.Config, error) {
return r.cfg, nil
}