mirror of https://github.com/grafana/grafana.git
				
				
				
			
		
			
				
	
	
		
			482 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			482 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
package expr
 | 
						||
 | 
						||
import (
 | 
						||
	"context"
 | 
						||
	"errors"
 | 
						||
	"fmt"
 | 
						||
	"time"
 | 
						||
 | 
						||
	"github.com/grafana/grafana-plugin-sdk-go/backend"
 | 
						||
	"github.com/grafana/grafana-plugin-sdk-go/data"
 | 
						||
	"github.com/prometheus/client_golang/prometheus"
 | 
						||
	"go.opentelemetry.io/otel/attribute"
 | 
						||
	"go.opentelemetry.io/otel/codes"
 | 
						||
	semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
 | 
						||
	"go.opentelemetry.io/otel/trace"
 | 
						||
 | 
						||
	"github.com/grafana/grafana-plugin-sdk-go/backend/log"
 | 
						||
	"github.com/grafana/grafana/pkg/expr/mathexp"
 | 
						||
	"github.com/grafana/grafana/pkg/expr/metrics"
 | 
						||
	"github.com/grafana/grafana/pkg/expr/sql"
 | 
						||
	"github.com/grafana/grafana/pkg/infra/tracing"
 | 
						||
	"github.com/grafana/grafana/pkg/setting"
 | 
						||
)
 | 
						||
 | 
						||
const SQLLoggerName = "expr.sql"
 | 
						||
 | 
						||
// SQLCommand is an expression to run SQL over results
 | 
						||
type SQLCommand struct {
 | 
						||
	query       string
 | 
						||
	varsToQuery []string
 | 
						||
	refID       string
 | 
						||
 | 
						||
	format string
 | 
						||
 | 
						||
	inputLimit  int64
 | 
						||
	outputLimit int64
 | 
						||
	timeout     time.Duration
 | 
						||
	logger      log.Logger
 | 
						||
}
 | 
						||
 | 
						||
// NewSQLCommand creates a new SQLCommand.
 | 
						||
func NewSQLCommand(ctx context.Context, logger log.Logger, refID, format, rawSQL string, intputLimit, outputLimit int64, timeout time.Duration) (*SQLCommand, error) {
 | 
						||
	sqlLogger := backend.NewLoggerWith("logger", SQLLoggerName).FromContext(ctx)
 | 
						||
	if rawSQL == "" {
 | 
						||
		return nil, sql.MakeErrEmptyQuery(refID)
 | 
						||
	}
 | 
						||
	tables, err := sql.TablesList(ctx, rawSQL)
 | 
						||
	if err != nil {
 | 
						||
		sqlLogger.Warn("invalid sql query", "sql", rawSQL, "error", err)
 | 
						||
		return nil, sql.MakeErrInvalidQuery(refID, err)
 | 
						||
	}
 | 
						||
	if len(tables) == 0 {
 | 
						||
		sqlLogger.Warn("no tables found in SQL query", "sql", rawSQL)
 | 
						||
	}
 | 
						||
	if tables != nil {
 | 
						||
		sqlLogger.Debug("REF tables", "tables", tables, "sql", rawSQL)
 | 
						||
	}
 | 
						||
 | 
						||
	return &SQLCommand{
 | 
						||
		query:       rawSQL,
 | 
						||
		varsToQuery: tables,
 | 
						||
		refID:       refID,
 | 
						||
		inputLimit:  intputLimit,
 | 
						||
		outputLimit: outputLimit,
 | 
						||
		timeout:     timeout,
 | 
						||
		format:      format,
 | 
						||
		logger:      sqlLogger,
 | 
						||
	}, nil
 | 
						||
}
 | 
						||
 | 
						||
// UnmarshalSQLCommand creates a SQLCommand from Grafana's frontend query.
 | 
						||
func UnmarshalSQLCommand(ctx context.Context, rn *rawNode, cfg *setting.Cfg) (*SQLCommand, error) {
 | 
						||
	sqlLogger := backend.NewLoggerWith("logger", SQLLoggerName).FromContext(ctx)
 | 
						||
	if rn.TimeRange == nil {
 | 
						||
		sqlLogger.Error("time range must be specified for refID", "refID", rn.RefID)
 | 
						||
		return nil, fmt.Errorf("time range must be specified for refID %s", rn.RefID)
 | 
						||
	}
 | 
						||
 | 
						||
	expressionRaw, ok := rn.Query["expression"]
 | 
						||
	if !ok {
 | 
						||
		sqlLogger.Error("no expression in the query", "query", rn.Query)
 | 
						||
		return nil, errors.New("no expression in the query")
 | 
						||
	}
 | 
						||
	expression, ok := expressionRaw.(string)
 | 
						||
	if !ok {
 | 
						||
		sqlLogger.Error("expected sql expression to be type string", "expression", expressionRaw)
 | 
						||
		return nil, fmt.Errorf("expected sql expression to be type string, but got type %T", expressionRaw)
 | 
						||
	}
 | 
						||
 | 
						||
	if cfg.SQLExpressionQueryLengthLimit > 0 && len(expression) > int(cfg.SQLExpressionQueryLengthLimit) {
 | 
						||
		return nil, sql.MakeQueryTooLongError(rn.RefID, cfg.SQLExpressionQueryLengthLimit)
 | 
						||
	}
 | 
						||
 | 
						||
	formatRaw := rn.Query["format"]
 | 
						||
	format, _ := formatRaw.(string)
 | 
						||
 | 
						||
	return NewSQLCommand(ctx, sqlLogger, rn.RefID, format, expression, cfg.SQLExpressionCellLimit, cfg.SQLExpressionOutputCellLimit, cfg.SQLExpressionTimeout)
 | 
						||
}
 | 
						||
 | 
						||
// NeedsVars returns the variable names (refIds) that are dependencies
 | 
						||
// to execute the command and allows the command to fulfill the Command interface.
 | 
						||
func (gr *SQLCommand) NeedsVars() []string {
 | 
						||
	return gr.varsToQuery
 | 
						||
}
 | 
						||
 | 
						||
// Execute runs the command and returns the results if successful.
 | 
						||
// If there is an error, it will set Results.Error and return (the return from the func should never error).
 | 
						||
func (gr *SQLCommand) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, tracer tracing.Tracer, metrics *metrics.ExprMetrics) (mathexp.Results, error) {
 | 
						||
	_, span := tracer.Start(ctx, "SSE.ExecuteSQL")
 | 
						||
	start := time.Now()
 | 
						||
	tc := int64(0)
 | 
						||
	rsp := mathexp.Results{}
 | 
						||
	errorType := "none"
 | 
						||
 | 
						||
	defer func() {
 | 
						||
		duration := float64(time.Since(start).Milliseconds())
 | 
						||
		statusLabel := "ok"
 | 
						||
		if rsp.Error != nil {
 | 
						||
			e := &sql.ErrorWithCategory{}
 | 
						||
			if errors.As(rsp.Error, &e) {
 | 
						||
				errorType = e.Category()
 | 
						||
			} else {
 | 
						||
				errorType = "unknown"
 | 
						||
			}
 | 
						||
			statusLabel = "error"
 | 
						||
			span.AddEvent("exception", trace.WithAttributes(
 | 
						||
				semconv.ExceptionType(errorType),
 | 
						||
				semconv.ExceptionMessage(rsp.Error.Error()),
 | 
						||
			))
 | 
						||
			span.SetAttributes(attribute.String("error.category", errorType))
 | 
						||
			span.SetStatus(codes.Error, errorType)
 | 
						||
			gr.logger.Error("SQL command execution failed", "error", rsp.Error.Error(), "error_type", errorType)
 | 
						||
		}
 | 
						||
		span.End()
 | 
						||
 | 
						||
		// --- Exemplar labels from the current span ---
 | 
						||
		sc := span.SpanContext()
 | 
						||
		var ex prometheus.Labels
 | 
						||
		if sc.IsValid() {
 | 
						||
			ex = prometheus.Labels{
 | 
						||
				"trace_id": sc.TraceID().String(),
 | 
						||
				"span_id":  sc.SpanID().String(),
 | 
						||
			}
 | 
						||
		}
 | 
						||
 | 
						||
		// --- Counter with exemplar (if supported) ---
 | 
						||
		cnt := metrics.SqlCommandCount.WithLabelValues(statusLabel, errorType)
 | 
						||
		if ex != nil {
 | 
						||
			if ce, ok := cnt.(prometheus.ExemplarAdder); ok {
 | 
						||
				ce.AddWithExemplar(1, ex)
 | 
						||
			} else {
 | 
						||
				cnt.Inc()
 | 
						||
			}
 | 
						||
		} else {
 | 
						||
			cnt.Inc()
 | 
						||
		}
 | 
						||
 | 
						||
		// --- Duration histogram with exemplar (if supported) ---
 | 
						||
		obs := metrics.SqlCommandDuration.WithLabelValues(statusLabel)
 | 
						||
		if ex != nil {
 | 
						||
			if eo, ok := obs.(prometheus.ExemplarObserver); ok {
 | 
						||
				eo.ObserveWithExemplar(duration, ex)
 | 
						||
			} else {
 | 
						||
				obs.Observe(duration)
 | 
						||
			}
 | 
						||
		} else {
 | 
						||
			obs.Observe(duration)
 | 
						||
		}
 | 
						||
 | 
						||
		// --- Cell count histogram with exemplar (if supported) ---
 | 
						||
		obsCells := metrics.SqlCommandCellCount.WithLabelValues(statusLabel)
 | 
						||
		if ex != nil {
 | 
						||
			if eo, ok := obsCells.(prometheus.ExemplarObserver); ok {
 | 
						||
				eo.ObserveWithExemplar(float64(tc), ex)
 | 
						||
			} else {
 | 
						||
				obsCells.Observe(float64(tc))
 | 
						||
			}
 | 
						||
		} else {
 | 
						||
			obsCells.Observe(float64(tc))
 | 
						||
		}
 | 
						||
	}()
 | 
						||
 | 
						||
	allFrames := []*data.Frame{}
 | 
						||
	for _, ref := range gr.varsToQuery {
 | 
						||
		results, ok := vars[ref]
 | 
						||
		if !ok {
 | 
						||
			gr.logger.Warn("no results found for", "ref", ref)
 | 
						||
			continue
 | 
						||
		}
 | 
						||
		frames := results.Values.AsDataFrames(ref)
 | 
						||
		allFrames = append(allFrames, frames...)
 | 
						||
	}
 | 
						||
 | 
						||
	tc = totalCells(allFrames)
 | 
						||
 | 
						||
	// limit of 0 or less means no limit (following convention)
 | 
						||
	if gr.inputLimit > 0 && tc > gr.inputLimit {
 | 
						||
		rsp.Error = sql.MakeInputLimitExceededError(gr.refID, gr.inputLimit)
 | 
						||
		return rsp, nil
 | 
						||
	}
 | 
						||
 | 
						||
	gr.logger.Debug("Executing query", "query", gr.query, "frames", len(allFrames))
 | 
						||
 | 
						||
	db := sql.DB{}
 | 
						||
	frame, err := db.QueryFrames(ctx, tracer, gr.refID, gr.query, allFrames, sql.WithMaxOutputCells(gr.outputLimit), sql.WithTimeout(gr.timeout))
 | 
						||
	if err != nil {
 | 
						||
		rsp.Error = err
 | 
						||
		return rsp, nil
 | 
						||
	}
 | 
						||
 | 
						||
	gr.logger.Debug("Done Executing query", "query", gr.query, "rows", frame.Rows())
 | 
						||
 | 
						||
	if frame.Rows() == 0 {
 | 
						||
		rsp.Values = mathexp.Values{
 | 
						||
			mathexp.NoData{Frame: frame},
 | 
						||
		}
 | 
						||
		return rsp, nil
 | 
						||
	}
 | 
						||
 | 
						||
	switch gr.format {
 | 
						||
	case "alerting":
 | 
						||
		numberSet, err := extractNumberSetFromSQLForAlerting(frame)
 | 
						||
		if err != nil {
 | 
						||
			rsp.Error = err
 | 
						||
			return rsp, nil
 | 
						||
		}
 | 
						||
		vals := make([]mathexp.Value, 0, len(numberSet))
 | 
						||
		for i := range numberSet {
 | 
						||
			vals = append(vals, numberSet[i])
 | 
						||
		}
 | 
						||
		rsp.Values = vals
 | 
						||
 | 
						||
	default:
 | 
						||
		rsp.Values = mathexp.Values{
 | 
						||
			mathexp.TableData{Frame: frame},
 | 
						||
		}
 | 
						||
	}
 | 
						||
	return rsp, nil
 | 
						||
}
 | 
						||
 | 
						||
func (gr *SQLCommand) Type() string {
 | 
						||
	return TypeSQL.String()
 | 
						||
}
 | 
						||
 | 
						||
func totalCells(frames []*data.Frame) (total int64) {
 | 
						||
	for _, frame := range frames {
 | 
						||
		if frame != nil {
 | 
						||
			// Calculate cells as rows × columns
 | 
						||
			rows := int64(frame.Rows())
 | 
						||
			cols := int64(len(frame.Fields))
 | 
						||
			total += rows * cols
 | 
						||
		}
 | 
						||
	}
 | 
						||
	return
 | 
						||
}
 | 
						||
 | 
						||
// extractNumberSetFromSQLForAlerting converts a data frame produced by a SQL expression
 | 
						||
// into a slice of mathexp.Number values for use in alerting.
 | 
						||
//
 | 
						||
// This function enforces strict semantics: each row must have exactly one numeric value
 | 
						||
// and a unique label set. If any label set appears more than once, an error is returned.
 | 
						||
//
 | 
						||
// It is the responsibility of the SQL query to ensure uniqueness — for example, by
 | 
						||
// applying GROUP BY or aggregation clauses. This function will not deduplicate rows;
 | 
						||
// it will reject the entire input if any duplicates are present.
 | 
						||
//
 | 
						||
// Returns an error if:
 | 
						||
//   - No numeric field is found.
 | 
						||
//   - More than one numeric field exists.
 | 
						||
//   - Any label set appears more than once.
 | 
						||
func extractNumberSetFromSQLForAlerting(frame *data.Frame) ([]mathexp.Number, error) {
 | 
						||
	var (
 | 
						||
		numericField   *data.Field
 | 
						||
		numericFieldIx int
 | 
						||
	)
 | 
						||
 | 
						||
	// Find the only numeric field
 | 
						||
	for i, f := range frame.Fields {
 | 
						||
		if f.Type().Numeric() {
 | 
						||
			if numericField != nil {
 | 
						||
				return nil, fmt.Errorf("expected exactly one numeric field, but found multiple")
 | 
						||
			}
 | 
						||
			numericField = f
 | 
						||
			numericFieldIx = i
 | 
						||
		}
 | 
						||
	}
 | 
						||
	if numericField == nil {
 | 
						||
		return nil, fmt.Errorf("no numeric field found in frame")
 | 
						||
	}
 | 
						||
 | 
						||
	type row struct {
 | 
						||
		value  float64
 | 
						||
		labels data.Labels
 | 
						||
	}
 | 
						||
	rows := make([]row, 0, frame.Rows())
 | 
						||
	counts := map[data.Fingerprint]int{}
 | 
						||
	labelMap := map[data.Fingerprint]string{}
 | 
						||
 | 
						||
	for i := 0; i < frame.Rows(); i++ {
 | 
						||
		val, err := numericField.FloatAt(i)
 | 
						||
		if err != nil {
 | 
						||
			return nil, fmt.Errorf("failed to read numeric value at row %d: %w", i, err)
 | 
						||
		}
 | 
						||
 | 
						||
		labels := data.Labels{}
 | 
						||
		for j, f := range frame.Fields {
 | 
						||
			if j == numericFieldIx || (f.Type() != data.FieldTypeString && f.Type() != data.FieldTypeNullableString) {
 | 
						||
				continue
 | 
						||
			}
 | 
						||
 | 
						||
			val := f.At(i)
 | 
						||
			switch v := val.(type) {
 | 
						||
			case *string:
 | 
						||
				if v != nil {
 | 
						||
					labels[f.Name] = *v
 | 
						||
				}
 | 
						||
			case string:
 | 
						||
				labels[f.Name] = v
 | 
						||
			}
 | 
						||
		}
 | 
						||
 | 
						||
		fp := labels.Fingerprint()
 | 
						||
		counts[fp]++
 | 
						||
		labelMap[fp] = labels.String()
 | 
						||
 | 
						||
		rows = append(rows, row{value: val, labels: labels})
 | 
						||
	}
 | 
						||
 | 
						||
	// Check for any duplicates
 | 
						||
	duplicates := make([]string, 0)
 | 
						||
	for fp, count := range counts {
 | 
						||
		if count > 1 {
 | 
						||
			duplicates = append(duplicates, labelMap[fp])
 | 
						||
		}
 | 
						||
	}
 | 
						||
 | 
						||
	if len(duplicates) > 0 {
 | 
						||
		return nil, sql.MakeDuplicateStringColumnError(duplicates)
 | 
						||
	}
 | 
						||
 | 
						||
	// Build final result
 | 
						||
	numbers := make([]mathexp.Number, 0, len(rows))
 | 
						||
	for _, r := range rows {
 | 
						||
		n := mathexp.NewNumber(numericField.Name, r.labels)
 | 
						||
		n.Frame.Fields[0].Config = numericField.Config
 | 
						||
		n.SetValue(&r.value)
 | 
						||
		numbers = append(numbers, n)
 | 
						||
	}
 | 
						||
 | 
						||
	return numbers, nil
 | 
						||
}
 | 
						||
 | 
						||
// handleSqlInput normalizes input DataFrames into a single dataframe with no labels so it can represent a table for use with SQL expressions.
 | 
						||
//
 | 
						||
// It handles three cases:
 | 
						||
//  1. If the input declares a supported time series or numeric kind in the wide or multi format (via FrameMeta.Type), it converts to a full-long formatted table using ConvertToFullLong.
 | 
						||
//  2. If the input is a single frame (no labels, no declared type), it passes through as-is.
 | 
						||
//  3. If the input has multiple frames or label metadata but lacks a supported type, it returns an error.
 | 
						||
//
 | 
						||
// The returned bool indicates if the input was (attempted to be) converted or passed through as-is.
 | 
						||
func handleSqlInput(ctx context.Context, tracer trace.Tracer, refID string, forRefIDs map[string]struct{}, dsType string, dataFrames data.Frames) (mathexp.Results, bool) {
 | 
						||
	_, span := tracer.Start(ctx, "SSE.HandleConvertSQLInput")
 | 
						||
	start := time.Now()
 | 
						||
	var result mathexp.Results
 | 
						||
	errorType := "none"
 | 
						||
	var metaType data.FrameType
 | 
						||
 | 
						||
	defer func() {
 | 
						||
		duration := float64(time.Since(start).Milliseconds())
 | 
						||
		statusLabel := "ok"
 | 
						||
		if result.Error != nil {
 | 
						||
			statusLabel = "error"
 | 
						||
		}
 | 
						||
		dataType := categorizeFrameInputType(dataFrames)
 | 
						||
		span.SetAttributes(
 | 
						||
			attribute.String("status", statusLabel),
 | 
						||
			attribute.Float64("duration", duration),
 | 
						||
			attribute.String("data.type", dataType),
 | 
						||
			attribute.String("datasource.type", dsType),
 | 
						||
		)
 | 
						||
 | 
						||
		if result.Error != nil {
 | 
						||
			e := &sql.ErrorWithCategory{}
 | 
						||
			if errors.As(result.Error, &e) {
 | 
						||
				errorType = e.Category()
 | 
						||
			} else {
 | 
						||
				errorType = "unknown"
 | 
						||
			}
 | 
						||
			span.AddEvent("exception", trace.WithAttributes(
 | 
						||
				semconv.ExceptionType(errorType),
 | 
						||
				semconv.ExceptionMessage(result.Error.Error()),
 | 
						||
			))
 | 
						||
			span.SetAttributes(attribute.String("error.category", errorType))
 | 
						||
			span.SetStatus(codes.Error, errorType)
 | 
						||
		}
 | 
						||
		span.End()
 | 
						||
	}()
 | 
						||
 | 
						||
	if len(dataFrames) == 0 {
 | 
						||
		return mathexp.Results{Values: mathexp.Values{mathexp.NewNoData()}}, false
 | 
						||
	}
 | 
						||
 | 
						||
	first := dataFrames[0]
 | 
						||
 | 
						||
	// Single Frame no data case
 | 
						||
	// Note: In the case of a support Frame Type, we may want to return the matching schema
 | 
						||
	// with no rows (e.g. include the `__value__` column). But not sure about this at this time.
 | 
						||
	if len(dataFrames) == 1 && len(first.Fields) == 0 {
 | 
						||
		result.Values = mathexp.Values{
 | 
						||
			mathexp.TableData{Frame: first},
 | 
						||
		}
 | 
						||
 | 
						||
		return result, false
 | 
						||
	}
 | 
						||
 | 
						||
	if first.Meta != nil {
 | 
						||
		metaType = first.Meta.Type
 | 
						||
	}
 | 
						||
 | 
						||
	if supportedToLongConversion(metaType) {
 | 
						||
		convertedFrames, err := ConvertToFullLong(dataFrames)
 | 
						||
		if err != nil {
 | 
						||
			result.Error = sql.MakeInputConvertError(err, refID, forRefIDs, dsType)
 | 
						||
			return result, true
 | 
						||
		}
 | 
						||
 | 
						||
		if len(convertedFrames) == 0 {
 | 
						||
			result.Error = fmt.Errorf("conversion succeeded but returned no frames")
 | 
						||
			return result, true
 | 
						||
		}
 | 
						||
 | 
						||
		result.Values = mathexp.Values{
 | 
						||
			mathexp.TableData{Frame: convertedFrames[0]},
 | 
						||
		}
 | 
						||
 | 
						||
		return result, true
 | 
						||
	}
 | 
						||
 | 
						||
	// If we don't have a supported type for conversion, see if we can pass through as a table (no labels, and only a single frame)
 | 
						||
	var frameTypeIssue string
 | 
						||
	if metaType == "" {
 | 
						||
		frameTypeIssue = "is missing the data type (frame.meta.type)"
 | 
						||
	} else {
 | 
						||
		frameTypeIssue = fmt.Sprintf("has an unsupported data type [%s]", metaType)
 | 
						||
	}
 | 
						||
 | 
						||
	// If meta.type is not supported, but there are labels or more than 1 frame error
 | 
						||
	if len(dataFrames) > 1 {
 | 
						||
		result.Error = sql.MakeInputConvertError(fmt.Errorf("can not convert because the response %s and has more than one dataframe that can not be automatically mapped to a single table", frameTypeIssue), refID, forRefIDs, dsType)
 | 
						||
		return result, false
 | 
						||
	}
 | 
						||
	for _, frame := range dataFrames {
 | 
						||
		for _, field := range frame.Fields {
 | 
						||
			if len(field.Labels) > 0 {
 | 
						||
				result.Error = sql.MakeInputConvertError(fmt.Errorf("can not convert because the response %s and has labels in the response that can not be mapped to a table", frameTypeIssue), refID, forRefIDs, dsType)
 | 
						||
				return result, false
 | 
						||
			}
 | 
						||
		}
 | 
						||
	}
 | 
						||
 | 
						||
	// Can pass through as table without conversion
 | 
						||
	result.Values = mathexp.Values{
 | 
						||
		mathexp.TableData{Frame: first},
 | 
						||
	}
 | 
						||
	return result, false
 | 
						||
}
 | 
						||
 | 
						||
func categorizeFrameInputType(dataFrames data.Frames) string {
 | 
						||
	switch {
 | 
						||
	case len(dataFrames) == 0:
 | 
						||
		return "missing"
 | 
						||
	case dataFrames[0].Meta == nil:
 | 
						||
		return "missing"
 | 
						||
	case dataFrames[0].Meta.Type == "":
 | 
						||
		return "missing"
 | 
						||
	case dataFrames[0].Meta.Type.IsKnownType():
 | 
						||
		return string(dataFrames[0].Meta.Type)
 | 
						||
	default:
 | 
						||
		return "unknown"
 | 
						||
	}
 | 
						||
}
 |