mirror of https://github.com/grafana/grafana.git
				
				
				
			
		
			
				
	
	
		
			410 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			410 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
| package expr
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/grafana/grafana-plugin-sdk-go/backend"
 | |
| 	"github.com/grafana/grafana-plugin-sdk-go/data/utils/jsoniter"
 | |
| 	data "github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1"
 | |
| 	"go.opentelemetry.io/otel/attribute"
 | |
| 	"go.opentelemetry.io/otel/codes"
 | |
| 	"gonum.org/v1/gonum/graph/simple"
 | |
| 
 | |
| 	"github.com/grafana/grafana/pkg/expr/classic"
 | |
| 	"github.com/grafana/grafana/pkg/expr/mathexp"
 | |
| 	"github.com/grafana/grafana/pkg/infra/log"
 | |
| 	"github.com/grafana/grafana/pkg/services/datasources"
 | |
| 	"github.com/grafana/grafana/pkg/services/featuremgmt"
 | |
| )
 | |
| 
 | |
| // label that is used when all mathexp.Series have 0 labels to make them identifiable by labels. The value of this label is extracted from value field names
 | |
| const nameLabelName = "__name__"
 | |
| 
 | |
| var (
 | |
| 	logger = log.New("expr")
 | |
| )
 | |
| 
 | |
| // baseNode includes common properties used across DPNodes.
 | |
| type baseNode struct {
 | |
| 	id    int64
 | |
| 	refID string
 | |
| }
 | |
| 
 | |
| type rawNode struct {
 | |
| 	RefID      string `json:"refId"`
 | |
| 	Query      map[string]any
 | |
| 	QueryRaw   []byte
 | |
| 	QueryType  string
 | |
| 	TimeRange  TimeRange
 | |
| 	DataSource *datasources.DataSource
 | |
| 	// We use this index as the id of the node graph so the order can remain during a the stable sort of the dependency graph execution order.
 | |
| 	// Some data sources, such as cloud watch, have order dependencies between queries.
 | |
| 	idx int64
 | |
| }
 | |
| 
 | |
| func getExpressionCommandTypeString(rawQuery map[string]any) (string, error) {
 | |
| 	rawType, ok := rawQuery["type"]
 | |
| 	if !ok {
 | |
| 		return "", errors.New("no expression command type in query")
 | |
| 	}
 | |
| 	typeString, ok := rawType.(string)
 | |
| 	if !ok {
 | |
| 		return "", fmt.Errorf("expected expression command type to be a string, got type %T", rawType)
 | |
| 	}
 | |
| 	return typeString, nil
 | |
| }
 | |
| 
 | |
| func GetExpressionCommandType(rawQuery map[string]any) (c CommandType, err error) {
 | |
| 	typeString, err := getExpressionCommandTypeString(rawQuery)
 | |
| 	if err != nil {
 | |
| 		return c, err
 | |
| 	}
 | |
| 	return ParseCommandType(typeString)
 | |
| }
 | |
| 
 | |
| // String returns a string representation of the node. In particular for
 | |
| // %v formatting in error messages.
 | |
| func (b *baseNode) String() string {
 | |
| 	return b.refID
 | |
| }
 | |
| 
 | |
| // CMDNode is a DPNode that holds an expression command.
 | |
| type CMDNode struct {
 | |
| 	baseNode
 | |
| 	CMDType CommandType
 | |
| 	Command Command
 | |
| }
 | |
| 
 | |
| // ID returns the id of the node so it can fulfill the gonum's graph Node interface.
 | |
| func (b *baseNode) ID() int64 {
 | |
| 	return b.id
 | |
| }
 | |
| 
 | |
| // RefID returns the refId of the node.
 | |
| func (b *baseNode) RefID() string {
 | |
| 	return b.refID
 | |
| }
 | |
| 
 | |
| // NodeType returns the data pipeline node type.
 | |
| func (gn *CMDNode) NodeType() NodeType {
 | |
| 	return TypeCMDNode
 | |
| }
 | |
| 
 | |
| func (gn *CMDNode) NeedsVars() []string {
 | |
| 	return gn.Command.NeedsVars()
 | |
| }
 | |
| 
 | |
| // Execute runs the node and adds the results to vars. If the node requires
 | |
| // other nodes they must have already been executed and their results must
 | |
| // already by in vars.
 | |
| func (gn *CMDNode) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, s *Service) (mathexp.Results, error) {
 | |
| 	return gn.Command.Execute(ctx, now, vars, s.tracer)
 | |
| }
 | |
| 
 | |
| func buildCMDNode(rn *rawNode, toggles featuremgmt.FeatureToggles) (*CMDNode, error) {
 | |
| 	commandType, err := GetExpressionCommandType(rn.Query)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("invalid command type in expression '%v': %w", rn.RefID, err)
 | |
| 	}
 | |
| 
 | |
| 	node := &CMDNode{
 | |
| 		baseNode: baseNode{
 | |
| 			id:    rn.idx,
 | |
| 			refID: rn.RefID,
 | |
| 		},
 | |
| 		CMDType: commandType,
 | |
| 	}
 | |
| 
 | |
| 	if toggles.IsEnabledGlobally(featuremgmt.FlagExpressionParser) {
 | |
| 		rn.QueryType, err = getExpressionCommandTypeString(rn.Query)
 | |
| 		if err != nil {
 | |
| 			return nil, err // should not happen because the command was parsed first thing
 | |
| 		}
 | |
| 
 | |
| 		// NOTE: this structure of this is weird now, because it is targeting a structure
 | |
| 		// where this is actually run in the root loop, however we want to verify the individual
 | |
| 		// node parsing before changing the full tree parser
 | |
| 		reader := NewExpressionQueryReader(toggles)
 | |
| 		iter, err := jsoniter.ParseBytes(jsoniter.ConfigDefault, rn.QueryRaw)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		q, err := reader.ReadQuery(data.NewDataQuery(map[string]any{
 | |
| 			"refId": rn.RefID,
 | |
| 			"type":  rn.QueryType,
 | |
| 		}), iter)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		node.Command = q.Command
 | |
| 		return node, err
 | |
| 	}
 | |
| 
 | |
| 	switch commandType {
 | |
| 	case TypeMath:
 | |
| 		node.Command, err = UnmarshalMathCommand(rn)
 | |
| 	case TypeReduce:
 | |
| 		node.Command, err = UnmarshalReduceCommand(rn)
 | |
| 	case TypeResample:
 | |
| 		node.Command, err = UnmarshalResampleCommand(rn)
 | |
| 	case TypeClassicConditions:
 | |
| 		node.Command, err = classic.UnmarshalConditionsCmd(rn.Query, rn.RefID)
 | |
| 	case TypeThreshold:
 | |
| 		node.Command, err = UnmarshalThresholdCommand(rn, toggles)
 | |
| 	case TypeSQL:
 | |
| 		node.Command, err = UnmarshalSQLCommand(rn)
 | |
| 	default:
 | |
| 		return nil, fmt.Errorf("expression command type '%v' in expression '%v' not implemented", commandType, rn.RefID)
 | |
| 	}
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to parse expression '%v': %w", rn.RefID, err)
 | |
| 	}
 | |
| 
 | |
| 	return node, nil
 | |
| }
 | |
| 
 | |
| const (
 | |
| 	defaultIntervalMS = int64(64)
 | |
| 	defaultMaxDP      = int64(5000)
 | |
| )
 | |
| 
 | |
| // DSNode is a DPNode that holds a datasource request.
 | |
| type DSNode struct {
 | |
| 	baseNode
 | |
| 	query      json.RawMessage
 | |
| 	datasource *datasources.DataSource
 | |
| 
 | |
| 	orgID      int64
 | |
| 	queryType  string
 | |
| 	timeRange  TimeRange
 | |
| 	intervalMS int64
 | |
| 	maxDP      int64
 | |
| 	request    Request
 | |
| }
 | |
| 
 | |
| func (dn *DSNode) String() string {
 | |
| 	if dn.datasource == nil {
 | |
| 		return "unknown"
 | |
| 	}
 | |
| 	return dn.datasource.Type
 | |
| }
 | |
| 
 | |
| // NodeType returns the data pipeline node type.
 | |
| func (dn *DSNode) NodeType() NodeType {
 | |
| 	return TypeDatasourceNode
 | |
| }
 | |
| 
 | |
| // NodeType returns the data pipeline node type.
 | |
| func (dn *DSNode) NeedsVars() []string {
 | |
| 	return []string{}
 | |
| }
 | |
| 
 | |
| func (s *Service) buildDSNode(dp *simple.DirectedGraph, rn *rawNode, req *Request) (*DSNode, error) {
 | |
| 	if rn.TimeRange == nil {
 | |
| 		return nil, fmt.Errorf("time range must be specified for refID %s", rn.RefID)
 | |
| 	}
 | |
| 	encodedQuery, err := json.Marshal(rn.Query)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	dsNode := &DSNode{
 | |
| 		baseNode: baseNode{
 | |
| 			id:    rn.idx,
 | |
| 			refID: rn.RefID,
 | |
| 		},
 | |
| 		orgID:      req.OrgId,
 | |
| 		query:      json.RawMessage(encodedQuery),
 | |
| 		queryType:  rn.QueryType,
 | |
| 		intervalMS: defaultIntervalMS,
 | |
| 		maxDP:      defaultMaxDP,
 | |
| 		timeRange:  rn.TimeRange,
 | |
| 		request:    *req,
 | |
| 		datasource: rn.DataSource,
 | |
| 	}
 | |
| 
 | |
| 	var floatIntervalMS float64
 | |
| 	if rawIntervalMS, ok := rn.Query["intervalMs"]; ok {
 | |
| 		if floatIntervalMS, ok = rawIntervalMS.(float64); !ok {
 | |
| 			return nil, fmt.Errorf("expected intervalMs to be an float64, got type %T for refId %v", rawIntervalMS, rn.RefID)
 | |
| 		}
 | |
| 		dsNode.intervalMS = int64(floatIntervalMS)
 | |
| 	}
 | |
| 
 | |
| 	var floatMaxDP float64
 | |
| 	if rawMaxDP, ok := rn.Query["maxDataPoints"]; ok {
 | |
| 		if floatMaxDP, ok = rawMaxDP.(float64); !ok {
 | |
| 			return nil, fmt.Errorf("expected maxDataPoints to be an float64, got type %T for refId %v", rawMaxDP, rn.RefID)
 | |
| 		}
 | |
| 		dsNode.maxDP = int64(floatMaxDP)
 | |
| 	}
 | |
| 
 | |
| 	return dsNode, nil
 | |
| }
 | |
| 
 | |
| // executeDSNodesGrouped groups datasource node queries by the datasource instance, and then sends them
 | |
| // in a single request with one or more queries to the datasource.
 | |
| func executeDSNodesGrouped(ctx context.Context, now time.Time, vars mathexp.Vars, s *Service, nodes []*DSNode) {
 | |
| 	type dsKey struct {
 | |
| 		uid   string // in theory I think this all I need for the key, but rather be safe
 | |
| 		id    int64
 | |
| 		orgID int64
 | |
| 	}
 | |
| 	byDS := make(map[dsKey][]*DSNode)
 | |
| 	for _, node := range nodes {
 | |
| 		k := dsKey{id: node.datasource.ID, uid: node.datasource.UID, orgID: node.orgID}
 | |
| 		byDS[k] = append(byDS[k], node)
 | |
| 	}
 | |
| 
 | |
| 	for _, nodeGroup := range byDS {
 | |
| 		func() {
 | |
| 			ctx, span := s.tracer.Start(ctx, "SSE.ExecuteDatasourceQuery")
 | |
| 			defer span.End()
 | |
| 			firstNode := nodeGroup[0]
 | |
| 			pCtx, err := s.pCtxProvider.GetWithDataSource(ctx, firstNode.datasource.Type, firstNode.request.User, firstNode.datasource)
 | |
| 			if err != nil {
 | |
| 				for _, dn := range nodeGroup {
 | |
| 					vars[dn.refID] = mathexp.Results{Error: datasources.ErrDataSourceNotFound}
 | |
| 				}
 | |
| 				return
 | |
| 			}
 | |
| 
 | |
| 			logger := logger.FromContext(ctx).New("datasourceType", firstNode.datasource.Type,
 | |
| 				"queryRefId", firstNode.refID,
 | |
| 				"datasourceUid", firstNode.datasource.UID,
 | |
| 				"datasourceVersion", firstNode.datasource.Version,
 | |
| 			)
 | |
| 
 | |
| 			span.SetAttributes(
 | |
| 				attribute.String("datasource.type", firstNode.datasource.Type),
 | |
| 				attribute.String("datasource.uid", firstNode.datasource.UID),
 | |
| 			)
 | |
| 
 | |
| 			req := &backend.QueryDataRequest{
 | |
| 				PluginContext: pCtx,
 | |
| 				Headers:       firstNode.request.Headers,
 | |
| 			}
 | |
| 
 | |
| 			for _, dn := range nodeGroup {
 | |
| 				req.Queries = append(req.Queries, backend.DataQuery{
 | |
| 					RefID:         dn.refID,
 | |
| 					MaxDataPoints: dn.maxDP,
 | |
| 					Interval:      time.Duration(int64(time.Millisecond) * dn.intervalMS),
 | |
| 					JSON:          dn.query,
 | |
| 					TimeRange:     dn.timeRange.AbsoluteTime(now),
 | |
| 					QueryType:     dn.queryType,
 | |
| 				})
 | |
| 			}
 | |
| 
 | |
| 			instrument := func(e error, rt string) {
 | |
| 				respStatus := "success"
 | |
| 				responseType := rt
 | |
| 				if e != nil {
 | |
| 					responseType = "error"
 | |
| 					respStatus = "failure"
 | |
| 					span.SetStatus(codes.Error, "failed to query data source")
 | |
| 					span.RecordError(e)
 | |
| 				}
 | |
| 				logger.Debug("Data source queried", "responseType", responseType)
 | |
| 				useDataplane := strings.HasPrefix(responseType, "dataplane-")
 | |
| 				s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), firstNode.datasource.Type).Inc()
 | |
| 			}
 | |
| 
 | |
| 			resp, err := s.dataService.QueryData(ctx, req)
 | |
| 			if err != nil {
 | |
| 				for _, dn := range nodeGroup {
 | |
| 					vars[dn.refID] = mathexp.Results{Error: MakeQueryError(firstNode.refID, firstNode.datasource.UID, err)}
 | |
| 				}
 | |
| 				instrument(err, "")
 | |
| 				return
 | |
| 			}
 | |
| 
 | |
| 			for _, dn := range nodeGroup {
 | |
| 				dataFrames, err := getResponseFrame(logger, resp, dn.refID)
 | |
| 				if err != nil {
 | |
| 					vars[dn.refID] = mathexp.Results{Error: MakeQueryError(dn.refID, dn.datasource.UID, err)}
 | |
| 					instrument(err, "")
 | |
| 					return
 | |
| 				}
 | |
| 
 | |
| 				var result mathexp.Results
 | |
| 				responseType, result, err := s.converter.Convert(ctx, dn.datasource.Type, dataFrames, s.allowLongFrames)
 | |
| 				if err != nil {
 | |
| 					result.Error = makeConversionError(dn.RefID(), err)
 | |
| 				}
 | |
| 				instrument(err, responseType)
 | |
| 				vars[dn.refID] = result
 | |
| 			}
 | |
| 		}()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Execute runs the node and adds the results to vars. If the node requires
 | |
| // other nodes they must have already been executed and their results must
 | |
| // already by in vars.
 | |
| func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s *Service) (r mathexp.Results, e error) {
 | |
| 	logger := logger.FromContext(ctx).New("datasourceType", dn.datasource.Type, "queryRefId", dn.refID, "datasourceUid", dn.datasource.UID, "datasourceVersion", dn.datasource.Version)
 | |
| 	ctx, span := s.tracer.Start(ctx, "SSE.ExecuteDatasourceQuery")
 | |
| 	defer span.End()
 | |
| 
 | |
| 	pCtx, err := s.pCtxProvider.GetWithDataSource(ctx, dn.datasource.Type, dn.request.User, dn.datasource)
 | |
| 	if err != nil {
 | |
| 		return mathexp.Results{}, err
 | |
| 	}
 | |
| 	span.SetAttributes(
 | |
| 		attribute.String("datasource.type", dn.datasource.Type),
 | |
| 		attribute.String("datasource.uid", dn.datasource.UID),
 | |
| 	)
 | |
| 
 | |
| 	req := &backend.QueryDataRequest{
 | |
| 		PluginContext: pCtx,
 | |
| 		Queries: []backend.DataQuery{
 | |
| 			{
 | |
| 				RefID:         dn.refID,
 | |
| 				MaxDataPoints: dn.maxDP,
 | |
| 				Interval:      time.Duration(int64(time.Millisecond) * dn.intervalMS),
 | |
| 				JSON:          dn.query,
 | |
| 				TimeRange:     dn.timeRange.AbsoluteTime(now),
 | |
| 				QueryType:     dn.queryType,
 | |
| 			},
 | |
| 		},
 | |
| 		Headers: dn.request.Headers,
 | |
| 	}
 | |
| 
 | |
| 	responseType := "unknown"
 | |
| 	respStatus := "success"
 | |
| 	defer func() {
 | |
| 		if e != nil {
 | |
| 			responseType = "error"
 | |
| 			respStatus = "failure"
 | |
| 			span.SetStatus(codes.Error, "failed to query data source")
 | |
| 			span.RecordError(e)
 | |
| 		}
 | |
| 		logger.Debug("Data source queried", "responseType", responseType)
 | |
| 		useDataplane := strings.HasPrefix(responseType, "dataplane-")
 | |
| 		s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), dn.datasource.Type).Inc()
 | |
| 	}()
 | |
| 
 | |
| 	resp, err := s.dataService.QueryData(ctx, req)
 | |
| 	if err != nil {
 | |
| 		return mathexp.Results{}, MakeQueryError(dn.refID, dn.datasource.UID, err)
 | |
| 	}
 | |
| 
 | |
| 	dataFrames, err := getResponseFrame(logger, resp, dn.refID)
 | |
| 	if err != nil {
 | |
| 		return mathexp.Results{}, MakeQueryError(dn.refID, dn.datasource.UID, err)
 | |
| 	}
 | |
| 
 | |
| 	var result mathexp.Results
 | |
| 	responseType, result, err = s.converter.Convert(ctx, dn.datasource.Type, dataFrames, s.allowLongFrames)
 | |
| 	if err != nil {
 | |
| 		err = makeConversionError(dn.refID, err)
 | |
| 	}
 | |
| 	return result, err
 | |
| }
 |