2020-11-19 20:17:00 +08:00
package expr
import (
"context"
"encoding/json"
2023-12-12 04:40:31 +08:00
"errors"
2020-11-19 20:17:00 +08:00
"fmt"
2023-06-15 21:20:08 +08:00
"strings"
2020-11-19 20:17:00 +08:00
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
2024-03-05 00:22:56 +08:00
"github.com/grafana/grafana-plugin-sdk-go/data/utils/jsoniter"
2024-03-09 00:12:59 +08:00
data "github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1"
2023-04-18 20:04:51 +08:00
"go.opentelemetry.io/otel/attribute"
2023-10-03 20:54:20 +08:00
"go.opentelemetry.io/otel/codes"
2023-01-30 16:38:51 +08:00
"gonum.org/v1/gonum/graph/simple"
2022-05-23 22:08:14 +08:00
2021-03-03 02:51:33 +08:00
"github.com/grafana/grafana/pkg/expr/classic"
2020-11-19 20:17:00 +08:00
"github.com/grafana/grafana/pkg/expr/mathexp"
2021-05-07 21:16:21 +08:00
"github.com/grafana/grafana/pkg/infra/log"
2022-06-28 00:23:15 +08:00
"github.com/grafana/grafana/pkg/services/datasources"
2023-04-13 00:24:34 +08:00
"github.com/grafana/grafana/pkg/services/featuremgmt"
2020-11-19 20:17:00 +08:00
)
2023-07-21 02:44:12 +08:00
// label that is used when all mathexp.Series have 0 labels to make them identifiable by labels. The value of this label is extracted from value field names
const nameLabelName = "__name__"
2021-05-07 21:16:21 +08:00
var (
logger = log . New ( "expr" )
)
2021-08-27 20:57:41 +08:00
// baseNode includes common properties used across DPNodes.
2020-11-19 20:17:00 +08:00
type baseNode struct {
id int64
refID string
}
type rawNode struct {
2022-12-02 02:08:36 +08:00
RefID string ` json:"refId" `
2023-08-30 23:46:47 +08:00
Query map [ string ] any
2023-07-14 01:37:50 +08:00
QueryRaw [ ] byte
2022-12-02 02:08:36 +08:00
QueryType string
TimeRange TimeRange
DataSource * datasources . DataSource
2023-08-18 19:49:59 +08:00
// We use this index as the id of the node graph so the order can remain during a the stable sort of the dependency graph execution order.
// Some data sources, such as cloud watch, have order dependencies between queries.
idx int64
2020-11-19 20:17:00 +08:00
}
2024-02-17 08:59:11 +08:00
func getExpressionCommandTypeString ( rawQuery map [ string ] any ) ( string , error ) {
2023-12-12 04:40:31 +08:00
rawType , ok := rawQuery [ "type" ]
2020-11-19 20:17:00 +08:00
if ! ok {
2024-02-17 08:59:11 +08:00
return "" , errors . New ( "no expression command type in query" )
2020-11-19 20:17:00 +08:00
}
typeString , ok := rawType . ( string )
if ! ok {
2024-02-17 08:59:11 +08:00
return "" , fmt . Errorf ( "expected expression command type to be a string, got type %T" , rawType )
}
return typeString , nil
}
func GetExpressionCommandType ( rawQuery map [ string ] any ) ( c CommandType , err error ) {
typeString , err := getExpressionCommandTypeString ( rawQuery )
if err != nil {
return c , err
2020-11-19 20:17:00 +08:00
}
return ParseCommandType ( typeString )
}
// String returns a string representation of the node. In particular for
2021-06-10 07:59:44 +08:00
// %v formatting in error messages.
2020-11-19 20:17:00 +08:00
func ( b * baseNode ) String ( ) string {
return b . refID
}
// CMDNode is a DPNode that holds an expression command.
type CMDNode struct {
baseNode
CMDType CommandType
Command Command
}
// ID returns the id of the node so it can fulfill the gonum's graph Node interface.
func ( b * baseNode ) ID ( ) int64 {
return b . id
}
// RefID returns the refId of the node.
func ( b * baseNode ) RefID ( ) string {
return b . refID
}
// NodeType returns the data pipeline node type.
func ( gn * CMDNode ) NodeType ( ) NodeType {
return TypeCMDNode
}
2023-09-14 01:58:16 +08:00
func ( gn * CMDNode ) NeedsVars ( ) [ ] string {
return gn . Command . NeedsVars ( )
}
2020-11-19 20:17:00 +08:00
// Execute runs the node and adds the results to vars. If the node requires
// other nodes they must have already been executed and their results must
// already by in vars.
2023-04-18 20:04:51 +08:00
func ( gn * CMDNode ) Execute ( ctx context . Context , now time . Time , vars mathexp . Vars , s * Service ) ( mathexp . Results , error ) {
return gn . Command . Execute ( ctx , now , vars , s . tracer )
2020-11-19 20:17:00 +08:00
}
2025-03-12 01:14:33 +08:00
func buildCMDNode ( rn * rawNode , toggles featuremgmt . FeatureToggles , sqlExpressionCellLimit int64 ) ( * CMDNode , error ) {
2023-12-12 04:40:31 +08:00
commandType , err := GetExpressionCommandType ( rn . Query )
2020-11-19 20:17:00 +08:00
if err != nil {
2022-09-22 03:14:11 +08:00
return nil , fmt . Errorf ( "invalid command type in expression '%v': %w" , rn . RefID , err )
2020-11-19 20:17:00 +08:00
}
2025-02-06 20:27:28 +08:00
if commandType == TypeSQL {
if ! toggles . IsEnabledGlobally ( featuremgmt . FlagSqlExpressions ) {
return nil , fmt . Errorf ( "sql expressions are disabled" )
}
}
2020-11-19 20:17:00 +08:00
node := & CMDNode {
baseNode : baseNode {
2023-08-18 19:49:59 +08:00
id : rn . idx ,
2020-11-19 20:17:00 +08:00
refID : rn . RefID ,
} ,
2021-04-27 19:22:11 +08:00
CMDType : commandType ,
2020-11-19 20:17:00 +08:00
}
2024-02-17 08:59:11 +08:00
if toggles . IsEnabledGlobally ( featuremgmt . FlagExpressionParser ) {
rn . QueryType , err = getExpressionCommandTypeString ( rn . Query )
if err != nil {
return nil , err // should not happen because the command was parsed first thing
}
// NOTE: this structure of this is weird now, because it is targeting a structure
// where this is actually run in the root loop, however we want to verify the individual
// node parsing before changing the full tree parser
2024-03-05 00:22:56 +08:00
reader := NewExpressionQueryReader ( toggles )
iter , err := jsoniter . ParseBytes ( jsoniter . ConfigDefault , rn . QueryRaw )
2024-02-17 08:59:11 +08:00
if err != nil {
return nil , err
}
2024-03-09 00:12:59 +08:00
q , err := reader . ReadQuery ( data . NewDataQuery ( map [ string ] any {
"refId" : rn . RefID ,
"type" : rn . QueryType ,
} ) , iter )
2024-02-17 08:59:11 +08:00
if err != nil {
return nil , err
}
node . Command = q . Command
return node , err
}
2020-11-19 20:17:00 +08:00
switch commandType {
case TypeMath :
node . Command , err = UnmarshalMathCommand ( rn )
case TypeReduce :
node . Command , err = UnmarshalReduceCommand ( rn )
case TypeResample :
node . Command , err = UnmarshalResampleCommand ( rn )
2021-03-03 02:51:33 +08:00
case TypeClassicConditions :
node . Command , err = classic . UnmarshalConditionsCmd ( rn . Query , rn . RefID )
2022-09-26 22:05:44 +08:00
case TypeThreshold :
2023-10-10 22:51:50 +08:00
node . Command , err = UnmarshalThresholdCommand ( rn , toggles )
2024-02-28 05:16:00 +08:00
case TypeSQL :
2025-03-12 01:14:33 +08:00
node . Command , err = UnmarshalSQLCommand ( rn , sqlExpressionCellLimit )
2020-11-19 20:17:00 +08:00
default :
2022-09-22 03:14:11 +08:00
return nil , fmt . Errorf ( "expression command type '%v' in expression '%v' not implemented" , commandType , rn . RefID )
2020-11-19 20:17:00 +08:00
}
if err != nil {
2022-09-22 03:14:11 +08:00
return nil , fmt . Errorf ( "failed to parse expression '%v': %w" , rn . RefID , err )
2020-11-19 20:17:00 +08:00
}
return node , nil
}
const (
defaultIntervalMS = int64 ( 64 )
defaultMaxDP = int64 ( 5000 )
)
// DSNode is a DPNode that holds a datasource request.
type DSNode struct {
baseNode
2022-12-02 02:08:36 +08:00
query json . RawMessage
datasource * datasources . DataSource
2021-01-16 00:33:50 +08:00
orgID int64
queryType string
2021-04-23 22:52:32 +08:00
timeRange TimeRange
2021-01-16 00:33:50 +08:00
intervalMS int64
maxDP int64
2021-07-09 19:43:22 +08:00
request Request
2025-02-06 20:27:28 +08:00
isInputToSQLExpr bool
2020-11-19 20:17:00 +08:00
}
2024-03-19 22:00:03 +08:00
func ( dn * DSNode ) String ( ) string {
if dn . datasource == nil {
return "unknown"
}
return dn . datasource . Type
}
2020-11-19 20:17:00 +08:00
// NodeType returns the data pipeline node type.
func ( dn * DSNode ) NodeType ( ) NodeType {
return TypeDatasourceNode
}
2023-09-14 01:58:16 +08:00
// NodeType returns the data pipeline node type.
func ( dn * DSNode ) NeedsVars ( ) [ ] string {
return [ ] string { }
}
2021-07-09 19:43:22 +08:00
func ( s * Service ) buildDSNode ( dp * simple . DirectedGraph , rn * rawNode , req * Request ) ( * DSNode , error ) {
2022-10-27 04:13:58 +08:00
if rn . TimeRange == nil {
return nil , fmt . Errorf ( "time range must be specified for refID %s" , rn . RefID )
}
2020-11-19 20:17:00 +08:00
encodedQuery , err := json . Marshal ( rn . Query )
if err != nil {
return nil , err
}
dsNode := & DSNode {
baseNode : baseNode {
2023-08-18 19:49:59 +08:00
id : rn . idx ,
2020-11-19 20:17:00 +08:00
refID : rn . RefID ,
} ,
2022-12-02 02:08:36 +08:00
orgID : req . OrgId ,
query : json . RawMessage ( encodedQuery ) ,
queryType : rn . QueryType ,
intervalMS : defaultIntervalMS ,
maxDP : defaultMaxDP ,
timeRange : rn . TimeRange ,
request : * req ,
datasource : rn . DataSource ,
2020-11-19 20:17:00 +08:00
}
var floatIntervalMS float64
2021-08-10 15:59:48 +08:00
if rawIntervalMS , ok := rn . Query [ "intervalMs" ] ; ok {
2020-11-19 20:17:00 +08:00
if floatIntervalMS , ok = rawIntervalMS . ( float64 ) ; ! ok {
return nil , fmt . Errorf ( "expected intervalMs to be an float64, got type %T for refId %v" , rawIntervalMS , rn . RefID )
}
dsNode . intervalMS = int64 ( floatIntervalMS )
}
var floatMaxDP float64
2021-08-10 15:59:48 +08:00
if rawMaxDP , ok := rn . Query [ "maxDataPoints" ] ; ok {
2020-11-19 20:17:00 +08:00
if floatMaxDP , ok = rawMaxDP . ( float64 ) ; ! ok {
return nil , fmt . Errorf ( "expected maxDataPoints to be an float64, got type %T for refId %v" , rawMaxDP , rn . RefID )
}
dsNode . maxDP = int64 ( floatMaxDP )
}
return dsNode , nil
}
2023-08-18 19:49:59 +08:00
// executeDSNodesGrouped groups datasource node queries by the datasource instance, and then sends them
// in a single request with one or more queries to the datasource.
2023-09-14 01:58:16 +08:00
func executeDSNodesGrouped ( ctx context . Context , now time . Time , vars mathexp . Vars , s * Service , nodes [ ] * DSNode ) {
2023-08-18 19:49:59 +08:00
type dsKey struct {
uid string // in theory I think this all I need for the key, but rather be safe
id int64
orgID int64
2020-11-19 20:17:00 +08:00
}
2023-08-18 19:49:59 +08:00
byDS := make ( map [ dsKey ] [ ] * DSNode )
for _ , node := range nodes {
k := dsKey { id : node . datasource . ID , uid : node . datasource . UID , orgID : node . orgID }
byDS [ k ] = append ( byDS [ k ] , node )
2020-11-19 20:17:00 +08:00
}
2023-08-18 19:49:59 +08:00
for _ , nodeGroup := range byDS {
2023-09-14 01:58:16 +08:00
func ( ) {
2023-08-18 19:49:59 +08:00
ctx , span := s . tracer . Start ( ctx , "SSE.ExecuteDatasourceQuery" )
defer span . End ( )
firstNode := nodeGroup [ 0 ]
pCtx , err := s . pCtxProvider . GetWithDataSource ( ctx , firstNode . datasource . Type , firstNode . request . User , firstNode . datasource )
if err != nil {
2023-09-14 01:58:16 +08:00
for _ , dn := range nodeGroup {
vars [ dn . refID ] = mathexp . Results { Error : datasources . ErrDataSourceNotFound }
}
return
2023-08-18 19:49:59 +08:00
}
logger := logger . FromContext ( ctx ) . New ( "datasourceType" , firstNode . datasource . Type ,
"queryRefId" , firstNode . refID ,
"datasourceUid" , firstNode . datasource . UID ,
"datasourceVersion" , firstNode . datasource . Version ,
)
2023-10-03 20:54:20 +08:00
span . SetAttributes (
attribute . String ( "datasource.type" , firstNode . datasource . Type ) ,
attribute . String ( "datasource.uid" , firstNode . datasource . UID ) ,
)
2023-08-18 19:49:59 +08:00
req := & backend . QueryDataRequest {
PluginContext : pCtx ,
Headers : firstNode . request . Headers ,
}
for _ , dn := range nodeGroup {
req . Queries = append ( req . Queries , backend . DataQuery {
RefID : dn . refID ,
MaxDataPoints : dn . maxDP ,
Interval : time . Duration ( int64 ( time . Millisecond ) * dn . intervalMS ) ,
JSON : dn . query ,
TimeRange : dn . timeRange . AbsoluteTime ( now ) ,
QueryType : dn . queryType ,
2023-07-13 02:59:02 +08:00
} )
2023-08-18 19:49:59 +08:00
}
2023-09-14 01:58:16 +08:00
instrument := func ( e error , rt string ) {
respStatus := "success"
responseType := rt
2023-08-18 19:49:59 +08:00
if e != nil {
responseType = "error"
respStatus = "failure"
2023-10-03 20:54:20 +08:00
span . SetStatus ( codes . Error , "failed to query data source" )
span . RecordError ( e )
2023-08-18 19:49:59 +08:00
}
logger . Debug ( "Data source queried" , "responseType" , responseType )
useDataplane := strings . HasPrefix ( responseType , "dataplane-" )
s . metrics . dsRequests . WithLabelValues ( respStatus , fmt . Sprintf ( "%t" , useDataplane ) , firstNode . datasource . Type ) . Inc ( )
2023-09-14 01:58:16 +08:00
}
2023-08-18 19:49:59 +08:00
resp , err := s . dataService . QueryData ( ctx , req )
if err != nil {
2023-09-14 01:58:16 +08:00
for _ , dn := range nodeGroup {
vars [ dn . refID ] = mathexp . Results { Error : MakeQueryError ( firstNode . refID , firstNode . datasource . UID , err ) }
}
instrument ( err , "" )
return
2023-08-18 19:49:59 +08:00
}
for _ , dn := range nodeGroup {
2024-06-13 22:58:39 +08:00
dataFrames , err := getResponseFrame ( logger , resp , dn . refID )
2023-08-18 19:49:59 +08:00
if err != nil {
2023-09-14 01:58:16 +08:00
vars [ dn . refID ] = mathexp . Results { Error : MakeQueryError ( dn . refID , dn . datasource . UID , err ) }
instrument ( err , "" )
return
2023-08-18 19:49:59 +08:00
}
var result mathexp . Results
2025-02-06 20:27:28 +08:00
responseType , result , err := s . converter . Convert ( ctx , dn . datasource . Type , dataFrames )
2023-08-18 19:49:59 +08:00
if err != nil {
2023-09-14 01:58:16 +08:00
result . Error = makeConversionError ( dn . RefID ( ) , err )
2023-08-18 19:49:59 +08:00
}
2023-09-14 01:58:16 +08:00
instrument ( err , responseType )
2023-08-18 19:49:59 +08:00
vars [ dn . refID ] = result
}
2023-09-14 01:58:16 +08:00
} ( )
2020-11-19 20:17:00 +08:00
}
2023-08-18 19:49:59 +08:00
}
2020-11-19 20:17:00 +08:00
2023-08-18 19:49:59 +08:00
// Execute runs the node and adds the results to vars. If the node requires
// other nodes they must have already been executed and their results must
// already by in vars.
func ( dn * DSNode ) Execute ( ctx context . Context , now time . Time , _ mathexp . Vars , s * Service ) ( r mathexp . Results , e error ) {
2023-09-08 04:02:07 +08:00
logger := logger . FromContext ( ctx ) . New ( "datasourceType" , dn . datasource . Type , "queryRefId" , dn . refID , "datasourceUid" , dn . datasource . UID , "datasourceVersion" , dn . datasource . Version )
ctx , span := s . tracer . Start ( ctx , "SSE.ExecuteDatasourceQuery" )
defer span . End ( )
pCtx , err := s . pCtxProvider . GetWithDataSource ( ctx , dn . datasource . Type , dn . request . User , dn . datasource )
if err != nil {
return mathexp . Results { } , err
}
2023-10-03 20:54:20 +08:00
span . SetAttributes (
attribute . String ( "datasource.type" , dn . datasource . Type ) ,
attribute . String ( "datasource.uid" , dn . datasource . UID ) ,
)
2023-09-08 04:02:07 +08:00
req := & backend . QueryDataRequest {
PluginContext : pCtx ,
Queries : [ ] backend . DataQuery {
{
RefID : dn . refID ,
MaxDataPoints : dn . maxDP ,
Interval : time . Duration ( int64 ( time . Millisecond ) * dn . intervalMS ) ,
JSON : dn . query ,
TimeRange : dn . timeRange . AbsoluteTime ( now ) ,
QueryType : dn . queryType ,
} ,
} ,
Headers : dn . request . Headers ,
}
responseType := "unknown"
respStatus := "success"
defer func ( ) {
if e != nil {
responseType = "error"
respStatus = "failure"
2023-10-03 20:54:20 +08:00
span . SetStatus ( codes . Error , "failed to query data source" )
span . RecordError ( e )
2023-09-08 04:02:07 +08:00
}
logger . Debug ( "Data source queried" , "responseType" , responseType )
useDataplane := strings . HasPrefix ( responseType , "dataplane-" )
s . metrics . dsRequests . WithLabelValues ( respStatus , fmt . Sprintf ( "%t" , useDataplane ) , dn . datasource . Type ) . Inc ( )
} ( )
resp , err := s . dataService . QueryData ( ctx , req )
if err != nil {
return mathexp . Results { } , MakeQueryError ( dn . refID , dn . datasource . UID , err )
}
2024-06-13 22:58:39 +08:00
dataFrames , err := getResponseFrame ( logger , resp , dn . refID )
2023-09-08 04:02:07 +08:00
if err != nil {
return mathexp . Results { } , MakeQueryError ( dn . refID , dn . datasource . UID , err )
}
var result mathexp . Results
2025-02-06 20:27:28 +08:00
// If the datasource node is an input to a SQL expression,
// the data must be in the Long format
if dn . isInputToSQLExpr {
var needsConversion bool
// Convert it if Multi:
if len ( dataFrames ) > 1 {
needsConversion = true
}
// Convert it if Wide (has labels):
if len ( dataFrames ) == 1 {
for _ , field := range dataFrames [ 0 ] . Fields {
if len ( field . Labels ) > 0 {
needsConversion = true
break
}
}
}
if needsConversion {
2025-03-25 04:04:43 +08:00
convertedFrames , err := ConvertToFullLong ( dataFrames )
2025-02-06 20:27:28 +08:00
if err != nil {
return result , fmt . Errorf ( "failed to convert data frames to long format for sql: %w" , err )
}
result . Values = mathexp . Values {
mathexp . TableData { Frame : convertedFrames [ 0 ] } ,
}
return result , nil
}
// Otherwise it is already Long format; return as is
result . Values = mathexp . Values {
mathexp . TableData { Frame : dataFrames [ 0 ] } ,
}
return result , nil
}
responseType , result , err = s . converter . Convert ( ctx , dn . datasource . Type , dataFrames )
2023-09-08 04:02:07 +08:00
if err != nil {
2023-09-14 01:58:16 +08:00
err = makeConversionError ( dn . refID , err )
2023-09-08 04:02:07 +08:00
}
return result , err
2023-06-15 21:20:08 +08:00
}