mirror of https://github.com/grafana/grafana.git
				
				
				
			
		
			
				
	
	
		
			259 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			259 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Go
		
	
	
	
| package grpcplugin
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 
 | |
| 	"github.com/grafana/grafana-plugin-sdk-go/backend"
 | |
| 	"github.com/grafana/grafana-plugin-sdk-go/backend/grpcplugin"
 | |
| 	"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
 | |
| 	"github.com/grafana/grafana/pkg/infra/log"
 | |
| 	"github.com/grafana/grafana/pkg/plugins/backendplugin"
 | |
| 	"github.com/grafana/grafana/pkg/plugins/backendplugin/pluginextensionv2"
 | |
| 	"github.com/grafana/grafana/pkg/plugins/backendplugin/secretsmanagerplugin"
 | |
| 	"github.com/hashicorp/go-plugin"
 | |
| 	"google.golang.org/grpc/codes"
 | |
| 	"google.golang.org/grpc/status"
 | |
| )
 | |
| 
 | |
| type ClientV2 struct {
 | |
| 	grpcplugin.DiagnosticsClient
 | |
| 	grpcplugin.ResourceClient
 | |
| 	grpcplugin.DataClient
 | |
| 	grpcplugin.StreamClient
 | |
| 	pluginextensionv2.RendererPlugin
 | |
| 	secretsmanagerplugin.SecretsManagerPlugin
 | |
| }
 | |
| 
 | |
| func newClientV2(descriptor PluginDescriptor, logger log.Logger, rpcClient plugin.ClientProtocol) (pluginClient, error) {
 | |
| 	rawDiagnostics, err := rpcClient.Dispense("diagnostics")
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	rawResource, err := rpcClient.Dispense("resource")
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	rawData, err := rpcClient.Dispense("data")
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	rawStream, err := rpcClient.Dispense("stream")
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	rawRenderer, err := rpcClient.Dispense("renderer")
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	rawSecretsManager, err := rpcClient.Dispense("secretsmanager")
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	c := ClientV2{}
 | |
| 	if rawDiagnostics != nil {
 | |
| 		if diagnosticsClient, ok := rawDiagnostics.(grpcplugin.DiagnosticsClient); ok {
 | |
| 			c.DiagnosticsClient = diagnosticsClient
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if rawResource != nil {
 | |
| 		if resourceClient, ok := rawResource.(grpcplugin.ResourceClient); ok {
 | |
| 			c.ResourceClient = resourceClient
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if rawData != nil {
 | |
| 		if dataClient, ok := rawData.(grpcplugin.DataClient); ok {
 | |
| 			c.DataClient = dataClient
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if rawStream != nil {
 | |
| 		if streamClient, ok := rawStream.(grpcplugin.StreamClient); ok {
 | |
| 			c.StreamClient = streamClient
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if rawRenderer != nil {
 | |
| 		if rendererPlugin, ok := rawRenderer.(pluginextensionv2.RendererPlugin); ok {
 | |
| 			c.RendererPlugin = rendererPlugin
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if rawSecretsManager != nil {
 | |
| 		if secretsManagerPlugin, ok := rawSecretsManager.(secretsmanagerplugin.SecretsManagerPlugin); ok {
 | |
| 			c.SecretsManagerPlugin = secretsManagerPlugin
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if descriptor.startRendererFn != nil {
 | |
| 		if err := descriptor.startRendererFn(descriptor.pluginID, c.RendererPlugin, logger); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if descriptor.startSecretsManagerFn != nil {
 | |
| 		if err := descriptor.startSecretsManagerFn(descriptor.pluginID, c.SecretsManagerPlugin, logger); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return &c, nil
 | |
| }
 | |
| 
 | |
| func (c *ClientV2) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) {
 | |
| 	if c.DiagnosticsClient == nil {
 | |
| 		return &backend.CollectMetricsResult{}, nil
 | |
| 	}
 | |
| 
 | |
| 	protoResp, err := c.DiagnosticsClient.CollectMetrics(ctx, backend.ToProto().CollectMetricsRequest(req))
 | |
| 	if err != nil {
 | |
| 		if status.Code(err) == codes.Unimplemented {
 | |
| 			return &backend.CollectMetricsResult{}, nil
 | |
| 		}
 | |
| 
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return backend.FromProto().CollectMetricsResponse(protoResp), nil
 | |
| }
 | |
| 
 | |
| func (c *ClientV2) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
 | |
| 	if c.DiagnosticsClient == nil {
 | |
| 		return nil, backendplugin.ErrMethodNotImplemented
 | |
| 	}
 | |
| 
 | |
| 	protoContext := backend.ToProto().PluginContext(req.PluginContext)
 | |
| 	protoResp, err := c.DiagnosticsClient.CheckHealth(ctx, &pluginv2.CheckHealthRequest{PluginContext: protoContext, Headers: req.Headers})
 | |
| 
 | |
| 	if err != nil {
 | |
| 		if status.Code(err) == codes.Unimplemented {
 | |
| 			return &backend.CheckHealthResult{
 | |
| 				Status:  backend.HealthStatusUnknown,
 | |
| 				Message: "Health check not implemented",
 | |
| 			}, nil
 | |
| 		}
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return backend.FromProto().CheckHealthResponse(protoResp), nil
 | |
| }
 | |
| 
 | |
| func (c *ClientV2) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
 | |
| 	if c.DataClient == nil {
 | |
| 		return nil, backendplugin.ErrMethodNotImplemented
 | |
| 	}
 | |
| 
 | |
| 	protoReq := backend.ToProto().QueryDataRequest(req)
 | |
| 	protoResp, err := c.DataClient.QueryData(ctx, protoReq)
 | |
| 
 | |
| 	if err != nil {
 | |
| 		if status.Code(err) == codes.Unimplemented {
 | |
| 			return nil, backendplugin.ErrMethodNotImplemented
 | |
| 		}
 | |
| 
 | |
| 		return nil, fmt.Errorf("%v: %w", "Failed to query data", err)
 | |
| 	}
 | |
| 
 | |
| 	return backend.FromProto().QueryDataResponse(protoResp)
 | |
| }
 | |
| 
 | |
| func (c *ClientV2) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
 | |
| 	if c.ResourceClient == nil {
 | |
| 		return backendplugin.ErrMethodNotImplemented
 | |
| 	}
 | |
| 
 | |
| 	protoReq := backend.ToProto().CallResourceRequest(req)
 | |
| 	protoStream, err := c.ResourceClient.CallResource(ctx, protoReq)
 | |
| 	if err != nil {
 | |
| 		if status.Code(err) == codes.Unimplemented {
 | |
| 			return backendplugin.ErrMethodNotImplemented
 | |
| 		}
 | |
| 
 | |
| 		return fmt.Errorf("%v: %w", "Failed to call resource", err)
 | |
| 	}
 | |
| 
 | |
| 	for {
 | |
| 		protoResp, err := protoStream.Recv()
 | |
| 		if err != nil {
 | |
| 			if status.Code(err) == codes.Unimplemented {
 | |
| 				return backendplugin.ErrMethodNotImplemented
 | |
| 			}
 | |
| 
 | |
| 			if errors.Is(err, io.EOF) {
 | |
| 				return nil
 | |
| 			}
 | |
| 
 | |
| 			return fmt.Errorf("%v: %w", "failed to receive call resource response", err)
 | |
| 		}
 | |
| 
 | |
| 		if err := sender.Send(backend.FromProto().CallResourceResponse(protoResp)); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *ClientV2) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
 | |
| 	if c.StreamClient == nil {
 | |
| 		return nil, backendplugin.ErrMethodNotImplemented
 | |
| 	}
 | |
| 	protoResp, err := c.StreamClient.SubscribeStream(ctx, backend.ToProto().SubscribeStreamRequest(req))
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return backend.FromProto().SubscribeStreamResponse(protoResp), nil
 | |
| }
 | |
| 
 | |
| func (c *ClientV2) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
 | |
| 	if c.StreamClient == nil {
 | |
| 		return nil, backendplugin.ErrMethodNotImplemented
 | |
| 	}
 | |
| 	protoResp, err := c.StreamClient.PublishStream(ctx, backend.ToProto().PublishStreamRequest(req))
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return backend.FromProto().PublishStreamResponse(protoResp), nil
 | |
| }
 | |
| 
 | |
| func (c *ClientV2) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
 | |
| 	if c.StreamClient == nil {
 | |
| 		return backendplugin.ErrMethodNotImplemented
 | |
| 	}
 | |
| 
 | |
| 	protoReq := backend.ToProto().RunStreamRequest(req)
 | |
| 	protoStream, err := c.StreamClient.RunStream(ctx, protoReq)
 | |
| 	if err != nil {
 | |
| 		if status.Code(err) == codes.Unimplemented {
 | |
| 			return backendplugin.ErrMethodNotImplemented
 | |
| 		}
 | |
| 		return fmt.Errorf("%v: %w", "Failed to call resource", err)
 | |
| 	}
 | |
| 
 | |
| 	for {
 | |
| 		p, err := protoStream.Recv()
 | |
| 		if err != nil {
 | |
| 			if status.Code(err) == codes.Unimplemented {
 | |
| 				return backendplugin.ErrMethodNotImplemented
 | |
| 			}
 | |
| 			if errors.Is(err, io.EOF) {
 | |
| 				return nil
 | |
| 			}
 | |
| 			return fmt.Errorf("error running stream: %w", err)
 | |
| 		}
 | |
| 		// From GRPC connection we receive already prepared JSON.
 | |
| 		err = sender.SendJSON(p.Data)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| }
 |