2024-02-28 05:16:00 +08:00
package expr
import (
"context"
"errors"
"fmt"
"time"
2025-08-13 23:20:14 +08:00
"github.com/grafana/grafana-plugin-sdk-go/backend"
2024-02-28 05:16:00 +08:00
"github.com/grafana/grafana-plugin-sdk-go/data"
2025-08-25 23:13:42 +08:00
"go.opentelemetry.io/otel/attribute"
2025-08-07 20:43:39 +08:00
"go.opentelemetry.io/otel/codes"
2025-08-25 23:13:42 +08:00
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
2024-02-28 05:16:00 +08:00
2025-08-25 23:13:42 +08:00
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
2024-02-28 05:16:00 +08:00
"github.com/grafana/grafana/pkg/expr/mathexp"
2025-04-11 02:51:44 +08:00
"github.com/grafana/grafana/pkg/expr/metrics"
2024-02-28 05:16:00 +08:00
"github.com/grafana/grafana/pkg/expr/sql"
"github.com/grafana/grafana/pkg/infra/tracing"
2025-05-14 03:22:20 +08:00
"github.com/grafana/grafana/pkg/setting"
2024-02-28 05:16:00 +08:00
)
2025-08-25 23:13:42 +08:00
const SQLLoggerName = "expr.sql"
2025-03-22 06:47:00 +08:00
2024-02-28 05:16:00 +08:00
// SQLCommand is an expression to run SQL over results
type SQLCommand struct {
query string
varsToQuery [ ] string
refID string
2025-05-14 03:22:20 +08:00
format string
inputLimit int64
outputLimit int64
timeout time . Duration
2025-08-25 23:13:42 +08:00
logger log . Logger
2024-02-28 05:16:00 +08:00
}
// NewSQLCommand creates a new SQLCommand.
2025-08-25 23:13:42 +08:00
func NewSQLCommand ( ctx context . Context , logger log . Logger , refID , format , rawSQL string , intputLimit , outputLimit int64 , timeout time . Duration ) ( * SQLCommand , error ) {
sqlLogger := backend . NewLoggerWith ( "logger" , SQLLoggerName ) . FromContext ( ctx )
2024-02-28 05:16:00 +08:00
if rawSQL == "" {
2025-08-25 23:13:42 +08:00
return nil , sql . MakeErrEmptyQuery ( refID )
2024-02-28 05:16:00 +08:00
}
2025-08-13 23:20:14 +08:00
tables , err := sql . TablesList ( ctx , rawSQL )
2024-02-28 05:16:00 +08:00
if err != nil {
2025-08-25 23:13:42 +08:00
sqlLogger . Warn ( "invalid sql query" , "sql" , rawSQL , "error" , err )
return nil , sql . MakeErrInvalidQuery ( refID , err )
2024-02-28 05:16:00 +08:00
}
2024-05-23 05:18:46 +08:00
if len ( tables ) == 0 {
2025-08-25 23:13:42 +08:00
sqlLogger . Warn ( "no tables found in SQL query" , "sql" , rawSQL )
2024-05-23 05:18:46 +08:00
}
if tables != nil {
2025-08-25 23:13:42 +08:00
sqlLogger . Debug ( "REF tables" , "tables" , tables , "sql" , rawSQL )
2024-05-23 05:18:46 +08:00
}
2025-03-12 01:14:33 +08:00
2024-02-28 05:16:00 +08:00
return & SQLCommand {
query : rawSQL ,
varsToQuery : tables ,
refID : refID ,
2025-05-14 03:22:20 +08:00
inputLimit : intputLimit ,
outputLimit : outputLimit ,
timeout : timeout ,
2025-04-02 21:39:36 +08:00
format : format ,
2025-08-25 23:13:42 +08:00
logger : sqlLogger ,
2024-02-28 05:16:00 +08:00
} , nil
}
// UnmarshalSQLCommand creates a SQLCommand from Grafana's frontend query.
2025-08-13 23:20:14 +08:00
func UnmarshalSQLCommand ( ctx context . Context , rn * rawNode , cfg * setting . Cfg ) ( * SQLCommand , error ) {
2025-08-25 23:13:42 +08:00
sqlLogger := backend . NewLoggerWith ( "logger" , SQLLoggerName ) . FromContext ( ctx )
2024-02-28 05:16:00 +08:00
if rn . TimeRange == nil {
2025-08-25 23:13:42 +08:00
sqlLogger . Error ( "time range must be specified for refID" , "refID" , rn . RefID )
2024-02-28 05:16:00 +08:00
return nil , fmt . Errorf ( "time range must be specified for refID %s" , rn . RefID )
}
expressionRaw , ok := rn . Query [ "expression" ]
if ! ok {
2025-08-13 23:20:14 +08:00
sqlLogger . Error ( "no expression in the query" , "query" , rn . Query )
2024-02-28 05:16:00 +08:00
return nil , errors . New ( "no expression in the query" )
}
expression , ok := expressionRaw . ( string )
if ! ok {
2025-08-13 23:20:14 +08:00
sqlLogger . Error ( "expected sql expression to be type string" , "expression" , expressionRaw )
2024-02-28 05:16:00 +08:00
return nil , fmt . Errorf ( "expected sql expression to be type string, but got type %T" , expressionRaw )
}
2025-08-28 00:08:25 +08:00
if cfg . SQLExpressionQueryLengthLimit > 0 && len ( expression ) > int ( cfg . SQLExpressionQueryLengthLimit ) {
return nil , sql . MakeQueryTooLongError ( rn . RefID , cfg . SQLExpressionQueryLengthLimit )
}
2025-04-02 21:39:36 +08:00
formatRaw := rn . Query [ "format" ]
format , _ := formatRaw . ( string )
2025-08-25 23:13:42 +08:00
return NewSQLCommand ( ctx , sqlLogger , rn . RefID , format , expression , cfg . SQLExpressionCellLimit , cfg . SQLExpressionOutputCellLimit , cfg . SQLExpressionTimeout )
2024-02-28 05:16:00 +08:00
}
// NeedsVars returns the variable names (refIds) that are dependencies
// to execute the command and allows the command to fulfill the Command interface.
func ( gr * SQLCommand ) NeedsVars ( ) [ ] string {
return gr . varsToQuery
}
2025-06-17 21:23:56 +08:00
// Execute runs the command and returns the results if successful.
// If there is an error, it will set Results.Error and return (the return from the func should never error).
func ( gr * SQLCommand ) Execute ( ctx context . Context , now time . Time , vars mathexp . Vars , tracer tracing . Tracer , metrics * metrics . ExprMetrics ) ( mathexp . Results , error ) {
2024-02-28 05:16:00 +08:00
_ , span := tracer . Start ( ctx , "SSE.ExecuteSQL" )
2025-04-11 02:51:44 +08:00
start := time . Now ( )
tc := int64 ( 0 )
2025-06-17 21:23:56 +08:00
rsp := mathexp . Results { }
2025-08-25 23:13:42 +08:00
errorType := "none"
2025-04-11 02:51:44 +08:00
defer func ( ) {
2025-06-17 21:23:56 +08:00
duration := float64 ( time . Since ( start ) . Milliseconds ( ) )
2025-04-11 02:51:44 +08:00
statusLabel := "ok"
2025-06-17 21:23:56 +08:00
if rsp . Error != nil {
2025-08-25 23:13:42 +08:00
e := & sql . ErrorWithCategory { }
if errors . As ( rsp . Error , & e ) {
errorType = e . Category ( )
} else {
errorType = "unknown"
}
2025-04-11 02:51:44 +08:00
statusLabel = "error"
2025-08-25 23:13:42 +08:00
span . AddEvent ( "exception" , trace . WithAttributes (
semconv . ExceptionType ( errorType ) ,
semconv . ExceptionMessage ( rsp . Error . Error ( ) ) ,
) )
span . SetAttributes ( attribute . String ( "error.category" , errorType ) )
span . SetStatus ( codes . Error , errorType )
gr . logger . Error ( "SQL command execution failed" , "error" , rsp . Error . Error ( ) , "error_type" , errorType )
2025-04-11 02:51:44 +08:00
}
2025-08-07 20:43:39 +08:00
span . End ( )
2025-06-17 21:23:56 +08:00
2025-08-25 23:13:42 +08:00
metrics . SqlCommandCount . WithLabelValues ( statusLabel , errorType ) . Inc ( )
2025-04-11 02:51:44 +08:00
metrics . SqlCommandDuration . WithLabelValues ( statusLabel ) . Observe ( duration )
metrics . SqlCommandCellCount . WithLabelValues ( statusLabel ) . Observe ( float64 ( tc ) )
} ( )
2024-02-28 05:16:00 +08:00
allFrames := [ ] * data . Frame { }
for _ , ref := range gr . varsToQuery {
2024-05-02 20:43:20 +08:00
results , ok := vars [ ref ]
if ! ok {
2025-08-25 23:13:42 +08:00
gr . logger . Warn ( "no results found for" , "ref" , ref )
2024-05-02 20:43:20 +08:00
continue
}
2024-02-28 05:16:00 +08:00
frames := results . Values . AsDataFrames ( ref )
allFrames = append ( allFrames , frames ... )
}
2025-04-11 02:51:44 +08:00
tc = totalCells ( allFrames )
2025-03-12 01:14:33 +08:00
// limit of 0 or less means no limit (following convention)
2025-05-14 03:22:20 +08:00
if gr . inputLimit > 0 && tc > gr . inputLimit {
2025-08-25 23:13:42 +08:00
rsp . Error = sql . MakeInputLimitExceededError ( gr . refID , gr . inputLimit )
2025-06-17 21:23:56 +08:00
return rsp , nil
2025-03-12 01:14:33 +08:00
}
2024-05-23 05:18:46 +08:00
2025-08-25 23:13:42 +08:00
gr . logger . Debug ( "Executing query" , "query" , gr . query , "frames" , len ( allFrames ) )
2025-03-12 01:14:33 +08:00
db := sql . DB { }
2025-05-14 03:22:20 +08:00
frame , err := db . QueryFrames ( ctx , tracer , gr . refID , gr . query , allFrames , sql . WithMaxOutputCells ( gr . outputLimit ) , sql . WithTimeout ( gr . timeout ) )
2024-02-28 05:16:00 +08:00
if err != nil {
rsp . Error = err
return rsp , nil
}
2025-08-13 23:20:14 +08:00
2025-08-25 23:13:42 +08:00
gr . logger . Debug ( "Done Executing query" , "query" , gr . query , "rows" , frame . Rows ( ) )
2024-02-28 05:16:00 +08:00
if frame . Rows ( ) == 0 {
rsp . Values = mathexp . Values {
mathexp . NoData { Frame : frame } ,
}
2025-02-06 20:27:28 +08:00
return rsp , nil
2024-02-28 05:16:00 +08:00
}
2025-04-02 21:39:36 +08:00
switch gr . format {
case "alerting" :
numberSet , err := extractNumberSetFromSQLForAlerting ( frame )
if err != nil {
rsp . Error = err
return rsp , nil
}
vals := make ( [ ] mathexp . Value , 0 , len ( numberSet ) )
for i := range numberSet {
vals = append ( vals , numberSet [ i ] )
}
rsp . Values = vals
2024-02-28 05:16:00 +08:00
2025-04-02 21:39:36 +08:00
default :
rsp . Values = mathexp . Values {
mathexp . TableData { Frame : frame } ,
}
}
2024-02-28 05:16:00 +08:00
return rsp , nil
}
2024-03-19 22:00:03 +08:00
func ( gr * SQLCommand ) Type ( ) string {
return TypeSQL . String ( )
}
2025-03-12 01:14:33 +08:00
func totalCells ( frames [ ] * data . Frame ) ( total int64 ) {
for _ , frame := range frames {
if frame != nil {
// Calculate cells as rows × columns
rows := int64 ( frame . Rows ( ) )
cols := int64 ( len ( frame . Fields ) )
total += rows * cols
}
}
return
}
2025-04-02 21:39:36 +08:00
2025-04-04 22:00:30 +08:00
// extractNumberSetFromSQLForAlerting converts a data frame produced by a SQL expression
// into a slice of mathexp.Number values for use in alerting.
//
// This function enforces strict semantics: each row must have exactly one numeric value
// and a unique label set. If any label set appears more than once, an error is returned.
//
// It is the responsibility of the SQL query to ensure uniqueness — for example, by
// applying GROUP BY or aggregation clauses. This function will not deduplicate rows;
// it will reject the entire input if any duplicates are present.
//
// Returns an error if:
// - No numeric field is found.
// - More than one numeric field exists.
// - Any label set appears more than once.
2025-04-02 21:39:36 +08:00
func extractNumberSetFromSQLForAlerting ( frame * data . Frame ) ( [ ] mathexp . Number , error ) {
var (
numericField * data . Field
numericFieldIx int
)
// Find the only numeric field
for i , f := range frame . Fields {
if f . Type ( ) . Numeric ( ) {
if numericField != nil {
return nil , fmt . Errorf ( "expected exactly one numeric field, but found multiple" )
}
numericField = f
numericFieldIx = i
}
}
if numericField == nil {
return nil , fmt . Errorf ( "no numeric field found in frame" )
}
2025-04-04 22:00:30 +08:00
type row struct {
value float64
labels data . Labels
}
rows := make ( [ ] row , 0 , frame . Rows ( ) )
counts := map [ data . Fingerprint ] int { }
labelMap := map [ data . Fingerprint ] string { }
2025-04-02 21:39:36 +08:00
for i := 0 ; i < frame . Rows ( ) ; i ++ {
val , err := numericField . FloatAt ( i )
if err != nil {
return nil , fmt . Errorf ( "failed to read numeric value at row %d: %w" , i , err )
}
labels := data . Labels { }
for j , f := range frame . Fields {
if j == numericFieldIx || ( f . Type ( ) != data . FieldTypeString && f . Type ( ) != data . FieldTypeNullableString ) {
continue
}
val := f . At ( i )
switch v := val . ( type ) {
case * string :
if v != nil {
labels [ f . Name ] = * v
}
case string :
labels [ f . Name ] = v
}
}
2025-04-04 22:00:30 +08:00
fp := labels . Fingerprint ( )
counts [ fp ] ++
labelMap [ fp ] = labels . String ( )
rows = append ( rows , row { value : val , labels : labels } )
}
// Check for any duplicates
duplicates := make ( [ ] string , 0 )
for fp , count := range counts {
if count > 1 {
duplicates = append ( duplicates , labelMap [ fp ] )
}
}
if len ( duplicates ) > 0 {
2025-08-25 23:13:42 +08:00
return nil , sql . MakeDuplicateStringColumnError ( duplicates )
2025-04-04 22:00:30 +08:00
}
// Build final result
numbers := make ( [ ] mathexp . Number , 0 , len ( rows ) )
for _ , r := range rows {
n := mathexp . NewNumber ( numericField . Name , r . labels )
2025-04-02 21:39:36 +08:00
n . Frame . Fields [ 0 ] . Config = numericField . Config
2025-04-04 22:00:30 +08:00
n . SetValue ( & r . value )
numbers = append ( numbers , n )
2025-04-02 21:39:36 +08:00
}
return numbers , nil
}
2025-08-25 23:13:42 +08:00
// handleSqlInput normalizes input DataFrames into a single dataframe with no labels so it can represent a table for use with SQL expressions.
//
// It handles three cases:
// 1. If the input declares a supported time series or numeric kind in the wide or multi format (via FrameMeta.Type), it converts to a full-long formatted table using ConvertToFullLong.
// 2. If the input is a single frame (no labels, no declared type), it passes through as-is.
// 3. If the input has multiple frames or label metadata but lacks a supported type, it returns an error.
//
// The returned bool indicates if the input was (attempted to be) converted or passed through as-is.
func handleSqlInput ( ctx context . Context , tracer trace . Tracer , refID string , forRefIDs map [ string ] struct { } , dsType string , dataFrames data . Frames ) ( mathexp . Results , bool ) {
_ , span := tracer . Start ( ctx , "SSE.HandleConvertSQLInput" )
start := time . Now ( )
var result mathexp . Results
errorType := "none"
var metaType data . FrameType
defer func ( ) {
duration := float64 ( time . Since ( start ) . Milliseconds ( ) )
statusLabel := "ok"
if result . Error != nil {
statusLabel = "error"
}
dataType := categorizeFrameInputType ( dataFrames )
span . SetAttributes (
attribute . String ( "status" , statusLabel ) ,
attribute . Float64 ( "duration" , duration ) ,
attribute . String ( "data.type" , dataType ) ,
attribute . String ( "datasource.type" , dsType ) ,
)
if result . Error != nil {
e := & sql . ErrorWithCategory { }
if errors . As ( result . Error , & e ) {
errorType = e . Category ( )
} else {
errorType = "unknown"
}
span . AddEvent ( "exception" , trace . WithAttributes (
semconv . ExceptionType ( errorType ) ,
semconv . ExceptionMessage ( result . Error . Error ( ) ) ,
) )
span . SetAttributes ( attribute . String ( "error.category" , errorType ) )
span . SetStatus ( codes . Error , errorType )
}
span . End ( )
} ( )
if len ( dataFrames ) == 0 {
return mathexp . Results { Values : mathexp . Values { mathexp . NewNoData ( ) } } , false
}
first := dataFrames [ 0 ]
// Single Frame no data case
// Note: In the case of a support Frame Type, we may want to return the matching schema
// with no rows (e.g. include the `__value__` column). But not sure about this at this time.
if len ( dataFrames ) == 1 && len ( first . Fields ) == 0 {
result . Values = mathexp . Values {
mathexp . TableData { Frame : first } ,
}
return result , false
}
if first . Meta != nil {
metaType = first . Meta . Type
}
if supportedToLongConversion ( metaType ) {
convertedFrames , err := ConvertToFullLong ( dataFrames )
if err != nil {
result . Error = sql . MakeInputConvertError ( err , refID , forRefIDs , dsType )
}
if len ( convertedFrames ) == 0 {
result . Error = fmt . Errorf ( "conversion succeeded but returned no frames" )
return result , true
}
result . Values = mathexp . Values {
mathexp . TableData { Frame : convertedFrames [ 0 ] } ,
}
return result , true
}
// If we don't have a supported type for conversion, see if we can pass through as a table (no labels, and only a single frame)
var frameTypeIssue string
if metaType == "" {
frameTypeIssue = "is missing the data type (frame.meta.type)"
} else {
frameTypeIssue = fmt . Sprintf ( "has an unsupported data type [%s]" , metaType )
}
// If meta.type is not supported, but there are labels or more than 1 frame error
if len ( dataFrames ) > 1 {
result . Error = sql . MakeInputConvertError ( fmt . Errorf ( "can not convert because the response %s and has more than one dataframe that can not be automatically mapped to a single table" , frameTypeIssue ) , refID , forRefIDs , dsType )
return result , false
}
for _ , frame := range dataFrames {
for _ , field := range frame . Fields {
if len ( field . Labels ) > 0 {
result . Error = sql . MakeInputConvertError ( fmt . Errorf ( "can not convert because the response %s and has labels in the response that can not be mapped to a table" , frameTypeIssue ) , refID , forRefIDs , dsType )
return result , false
}
}
}
// Can pass through as table without conversion
result . Values = mathexp . Values {
mathexp . TableData { Frame : first } ,
}
return result , false
}
func categorizeFrameInputType ( dataFrames data . Frames ) string {
switch {
case len ( dataFrames ) == 0 :
return "missing"
case dataFrames [ 0 ] . Meta == nil :
return "missing"
case dataFrames [ 0 ] . Meta . Type == "" :
return "missing"
case dataFrames [ 0 ] . Meta . Type . IsKnownType ( ) :
return string ( dataFrames [ 0 ] . Meta . Type )
default :
return "unknown"
}
}