| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | package expr | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"context" | 
					
						
							|  |  |  | 	"encoding/json" | 
					
						
							|  |  |  | 	"fmt" | 
					
						
							| 
									
										
										
										
											2023-06-15 21:20:08 +08:00
										 |  |  | 	"strings" | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 	"time" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/grafana/grafana-plugin-sdk-go/backend" | 
					
						
							|  |  |  | 	"github.com/grafana/grafana-plugin-sdk-go/data" | 
					
						
							| 
									
										
										
										
											2023-04-18 20:04:51 +08:00
										 |  |  | 	"go.opentelemetry.io/otel/attribute" | 
					
						
							| 
									
										
										
										
											2023-01-30 16:38:51 +08:00
										 |  |  | 	"gonum.org/v1/gonum/graph/simple" | 
					
						
							| 
									
										
										
										
											2022-05-23 22:08:14 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-03-03 02:51:33 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/expr/classic" | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/expr/mathexp" | 
					
						
							| 
									
										
										
										
											2021-05-07 21:16:21 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/infra/log" | 
					
						
							| 
									
										
										
										
											2023-07-13 02:59:02 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/infra/tracing" | 
					
						
							| 
									
										
										
										
											2022-06-28 00:23:15 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/services/datasources" | 
					
						
							| 
									
										
										
										
											2023-04-13 00:24:34 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/services/featuremgmt" | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-05-07 21:16:21 +08:00
										 |  |  | var ( | 
					
						
							|  |  |  | 	logger = log.New("expr") | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-11-16 21:42:22 +08:00
										 |  |  | type QueryError struct { | 
					
						
							| 
									
										
										
										
											2023-07-13 02:59:02 +08:00
										 |  |  | 	RefID         string | 
					
						
							|  |  |  | 	DatasourceUID string | 
					
						
							|  |  |  | 	Err           error | 
					
						
							| 
									
										
										
										
											2021-11-16 21:42:22 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (e QueryError) Error() string { | 
					
						
							|  |  |  | 	return fmt.Sprintf("failed to execute query %s: %s", e.RefID, e.Err) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-11-17 18:07:24 +08:00
										 |  |  | func (e QueryError) Unwrap() error { | 
					
						
							|  |  |  | 	return e.Err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-08-27 20:57:41 +08:00
										 |  |  | // baseNode includes common properties used across DPNodes.
 | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | type baseNode struct { | 
					
						
							|  |  |  | 	id    int64 | 
					
						
							|  |  |  | 	refID string | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type rawNode struct { | 
					
						
							| 
									
										
										
										
											2022-12-02 02:08:36 +08:00
										 |  |  | 	RefID      string `json:"refId"` | 
					
						
							|  |  |  | 	Query      map[string]interface{} | 
					
						
							|  |  |  | 	QueryType  string | 
					
						
							|  |  |  | 	TimeRange  TimeRange | 
					
						
							|  |  |  | 	DataSource *datasources.DataSource | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (rn *rawNode) GetCommandType() (c CommandType, err error) { | 
					
						
							|  |  |  | 	rawType, ok := rn.Query["type"] | 
					
						
							|  |  |  | 	if !ok { | 
					
						
							|  |  |  | 		return c, fmt.Errorf("no expression command type in query for refId %v", rn.RefID) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	typeString, ok := rawType.(string) | 
					
						
							|  |  |  | 	if !ok { | 
					
						
							|  |  |  | 		return c, fmt.Errorf("expected expression command type to be a string, got type %T", rawType) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return ParseCommandType(typeString) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // String returns a string representation of the node. In particular for
 | 
					
						
							| 
									
										
										
										
											2021-06-10 07:59:44 +08:00
										 |  |  | // %v formatting in error messages.
 | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | func (b *baseNode) String() string { | 
					
						
							|  |  |  | 	return b.refID | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // CMDNode is a DPNode that holds an expression command.
 | 
					
						
							|  |  |  | type CMDNode struct { | 
					
						
							|  |  |  | 	baseNode | 
					
						
							|  |  |  | 	CMDType CommandType | 
					
						
							|  |  |  | 	Command Command | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // ID returns the id of the node so it can fulfill the gonum's graph Node interface.
 | 
					
						
							|  |  |  | func (b *baseNode) ID() int64 { | 
					
						
							|  |  |  | 	return b.id | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // RefID returns the refId of the node.
 | 
					
						
							|  |  |  | func (b *baseNode) RefID() string { | 
					
						
							|  |  |  | 	return b.refID | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // NodeType returns the data pipeline node type.
 | 
					
						
							|  |  |  | func (gn *CMDNode) NodeType() NodeType { | 
					
						
							|  |  |  | 	return TypeCMDNode | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Execute runs the node and adds the results to vars. If the node requires
 | 
					
						
							|  |  |  | // other nodes they must have already been executed and their results must
 | 
					
						
							|  |  |  | // already by in vars.
 | 
					
						
							| 
									
										
										
										
											2023-04-18 20:04:51 +08:00
										 |  |  | func (gn *CMDNode) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, s *Service) (mathexp.Results, error) { | 
					
						
							|  |  |  | 	return gn.Command.Execute(ctx, now, vars, s.tracer) | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func buildCMDNode(dp *simple.DirectedGraph, rn *rawNode) (*CMDNode, error) { | 
					
						
							|  |  |  | 	commandType, err := rn.GetCommandType() | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2022-09-22 03:14:11 +08:00
										 |  |  | 		return nil, fmt.Errorf("invalid command type in expression '%v': %w", rn.RefID, err) | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	node := &CMDNode{ | 
					
						
							|  |  |  | 		baseNode: baseNode{ | 
					
						
							|  |  |  | 			id:    dp.NewNode().ID(), | 
					
						
							|  |  |  | 			refID: rn.RefID, | 
					
						
							|  |  |  | 		}, | 
					
						
							| 
									
										
										
										
											2021-04-27 19:22:11 +08:00
										 |  |  | 		CMDType: commandType, | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	switch commandType { | 
					
						
							|  |  |  | 	case TypeMath: | 
					
						
							|  |  |  | 		node.Command, err = UnmarshalMathCommand(rn) | 
					
						
							|  |  |  | 	case TypeReduce: | 
					
						
							|  |  |  | 		node.Command, err = UnmarshalReduceCommand(rn) | 
					
						
							|  |  |  | 	case TypeResample: | 
					
						
							|  |  |  | 		node.Command, err = UnmarshalResampleCommand(rn) | 
					
						
							| 
									
										
										
										
											2021-03-03 02:51:33 +08:00
										 |  |  | 	case TypeClassicConditions: | 
					
						
							|  |  |  | 		node.Command, err = classic.UnmarshalConditionsCmd(rn.Query, rn.RefID) | 
					
						
							| 
									
										
										
										
											2022-09-26 22:05:44 +08:00
										 |  |  | 	case TypeThreshold: | 
					
						
							|  |  |  | 		node.Command, err = UnmarshalThresholdCommand(rn) | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 	default: | 
					
						
							| 
									
										
										
										
											2022-09-22 03:14:11 +08:00
										 |  |  | 		return nil, fmt.Errorf("expression command type '%v' in expression '%v' not implemented", commandType, rn.RefID) | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2022-09-22 03:14:11 +08:00
										 |  |  | 		return nil, fmt.Errorf("failed to parse expression '%v': %w", rn.RefID, err) | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return node, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | const ( | 
					
						
							|  |  |  | 	defaultIntervalMS = int64(64) | 
					
						
							|  |  |  | 	defaultMaxDP      = int64(5000) | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // DSNode is a DPNode that holds a datasource request.
 | 
					
						
							|  |  |  | type DSNode struct { | 
					
						
							|  |  |  | 	baseNode | 
					
						
							| 
									
										
										
										
											2022-12-02 02:08:36 +08:00
										 |  |  | 	query      json.RawMessage | 
					
						
							|  |  |  | 	datasource *datasources.DataSource | 
					
						
							| 
									
										
										
										
											2021-01-16 00:33:50 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	orgID      int64 | 
					
						
							|  |  |  | 	queryType  string | 
					
						
							| 
									
										
										
										
											2021-04-23 22:52:32 +08:00
										 |  |  | 	timeRange  TimeRange | 
					
						
							| 
									
										
										
										
											2021-01-16 00:33:50 +08:00
										 |  |  | 	intervalMS int64 | 
					
						
							|  |  |  | 	maxDP      int64 | 
					
						
							| 
									
										
										
										
											2021-07-09 19:43:22 +08:00
										 |  |  | 	request    Request | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // NodeType returns the data pipeline node type.
 | 
					
						
							|  |  |  | func (dn *DSNode) NodeType() NodeType { | 
					
						
							|  |  |  | 	return TypeDatasourceNode | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-09 19:43:22 +08:00
										 |  |  | func (s *Service) buildDSNode(dp *simple.DirectedGraph, rn *rawNode, req *Request) (*DSNode, error) { | 
					
						
							| 
									
										
										
										
											2022-10-27 04:13:58 +08:00
										 |  |  | 	if rn.TimeRange == nil { | 
					
						
							|  |  |  | 		return nil, fmt.Errorf("time range must be specified for refID %s", rn.RefID) | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 	encodedQuery, err := json.Marshal(rn.Query) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	dsNode := &DSNode{ | 
					
						
							|  |  |  | 		baseNode: baseNode{ | 
					
						
							|  |  |  | 			id:    dp.NewNode().ID(), | 
					
						
							|  |  |  | 			refID: rn.RefID, | 
					
						
							|  |  |  | 		}, | 
					
						
							| 
									
										
										
										
											2022-12-02 02:08:36 +08:00
										 |  |  | 		orgID:      req.OrgId, | 
					
						
							|  |  |  | 		query:      json.RawMessage(encodedQuery), | 
					
						
							|  |  |  | 		queryType:  rn.QueryType, | 
					
						
							|  |  |  | 		intervalMS: defaultIntervalMS, | 
					
						
							|  |  |  | 		maxDP:      defaultMaxDP, | 
					
						
							|  |  |  | 		timeRange:  rn.TimeRange, | 
					
						
							|  |  |  | 		request:    *req, | 
					
						
							|  |  |  | 		datasource: rn.DataSource, | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	var floatIntervalMS float64 | 
					
						
							| 
									
										
										
										
											2021-08-10 15:59:48 +08:00
										 |  |  | 	if rawIntervalMS, ok := rn.Query["intervalMs"]; ok { | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 		if floatIntervalMS, ok = rawIntervalMS.(float64); !ok { | 
					
						
							|  |  |  | 			return nil, fmt.Errorf("expected intervalMs to be an float64, got type %T for refId %v", rawIntervalMS, rn.RefID) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		dsNode.intervalMS = int64(floatIntervalMS) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	var floatMaxDP float64 | 
					
						
							| 
									
										
										
										
											2021-08-10 15:59:48 +08:00
										 |  |  | 	if rawMaxDP, ok := rn.Query["maxDataPoints"]; ok { | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 		if floatMaxDP, ok = rawMaxDP.(float64); !ok { | 
					
						
							|  |  |  | 			return nil, fmt.Errorf("expected maxDataPoints to be an float64, got type %T for refId %v", rawMaxDP, rn.RefID) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		dsNode.maxDP = int64(floatMaxDP) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return dsNode, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Execute runs the node and adds the results to vars. If the node requires
 | 
					
						
							|  |  |  | // other nodes they must have already been executed and their results must
 | 
					
						
							|  |  |  | // already by in vars.
 | 
					
						
							| 
									
										
										
										
											2023-01-19 02:06:10 +08:00
										 |  |  | func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s *Service) (r mathexp.Results, e error) { | 
					
						
							| 
									
										
										
										
											2023-02-03 00:22:43 +08:00
										 |  |  | 	logger := logger.FromContext(ctx).New("datasourceType", dn.datasource.Type, "queryRefId", dn.refID, "datasourceUid", dn.datasource.UID, "datasourceVersion", dn.datasource.Version) | 
					
						
							| 
									
										
										
										
											2023-04-18 20:04:51 +08:00
										 |  |  | 	ctx, span := s.tracer.Start(ctx, "SSE.ExecuteDatasourceQuery") | 
					
						
							|  |  |  | 	defer span.End() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-06-08 19:59:51 +08:00
										 |  |  | 	pCtx, err := s.pCtxProvider.GetWithDataSource(ctx, dn.datasource.Type, dn.request.User, dn.datasource) | 
					
						
							| 
									
										
										
										
											2021-12-17 00:51:46 +08:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2023-06-08 19:59:51 +08:00
										 |  |  | 		return mathexp.Results{}, err | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2023-04-18 20:04:51 +08:00
										 |  |  | 	span.SetAttributes("datasource.type", dn.datasource.Type, attribute.Key("datasource.type").String(dn.datasource.Type)) | 
					
						
							| 
									
										
										
										
											2023-07-13 02:59:02 +08:00
										 |  |  | 	span.SetAttributes("datasource.uid", dn.datasource.UID, attribute.Key("datasource.uid").String(dn.datasource.UID)) | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-11-28 20:40:06 +08:00
										 |  |  | 	req := &backend.QueryDataRequest{ | 
					
						
							| 
									
										
										
										
											2023-06-08 19:59:51 +08:00
										 |  |  | 		PluginContext: pCtx, | 
					
						
							| 
									
										
										
										
											2022-11-28 20:40:06 +08:00
										 |  |  | 		Queries: []backend.DataQuery{ | 
					
						
							|  |  |  | 			{ | 
					
						
							|  |  |  | 				RefID:         dn.refID, | 
					
						
							|  |  |  | 				MaxDataPoints: dn.maxDP, | 
					
						
							|  |  |  | 				Interval:      time.Duration(int64(time.Millisecond) * dn.intervalMS), | 
					
						
							|  |  |  | 				JSON:          dn.query, | 
					
						
							|  |  |  | 				TimeRange:     dn.timeRange.AbsoluteTime(now), | 
					
						
							|  |  |  | 				QueryType:     dn.queryType, | 
					
						
							|  |  |  | 			}, | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 		}, | 
					
						
							| 
									
										
										
										
											2022-11-28 20:40:06 +08:00
										 |  |  | 		Headers: dn.request.Headers, | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-01-19 02:06:10 +08:00
										 |  |  | 	responseType := "unknown" | 
					
						
							| 
									
										
										
										
											2023-04-18 07:12:44 +08:00
										 |  |  | 	respStatus := "success" | 
					
						
							| 
									
										
										
										
											2023-01-19 02:06:10 +08:00
										 |  |  | 	defer func() { | 
					
						
							|  |  |  | 		if e != nil { | 
					
						
							|  |  |  | 			responseType = "error" | 
					
						
							| 
									
										
										
										
											2023-04-18 07:12:44 +08:00
										 |  |  | 			respStatus = "failure" | 
					
						
							| 
									
										
										
										
											2023-07-13 02:59:02 +08:00
										 |  |  | 			span.AddEvents([]string{"error", "message"}, | 
					
						
							|  |  |  | 				[]tracing.EventValue{ | 
					
						
							|  |  |  | 					{Str: fmt.Sprintf("%v", err)}, | 
					
						
							|  |  |  | 					{Str: "failed to query data source"}, | 
					
						
							|  |  |  | 				}) | 
					
						
							| 
									
										
										
										
											2023-01-19 02:06:10 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 		logger.Debug("Data source queried", "responseType", responseType) | 
					
						
							| 
									
										
										
										
											2023-06-15 21:20:08 +08:00
										 |  |  | 		useDataplane := strings.HasPrefix(responseType, "dataplane-") | 
					
						
							| 
									
										
										
										
											2023-04-18 07:12:44 +08:00
										 |  |  | 		s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane)).Inc() | 
					
						
							| 
									
										
										
										
											2023-01-19 02:06:10 +08:00
										 |  |  | 	}() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-11-28 20:40:06 +08:00
										 |  |  | 	resp, err := s.dataService.QueryData(ctx, req) | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2023-07-13 02:59:02 +08:00
										 |  |  | 		return mathexp.Results{}, QueryError{ | 
					
						
							|  |  |  | 			RefID:         dn.refID, | 
					
						
							|  |  |  | 			DatasourceUID: dn.datasource.UID, | 
					
						
							|  |  |  | 			Err:           err, | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	dataFrames, err := getResponseFrame(resp, dn.refID) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return mathexp.Results{}, QueryError{ | 
					
						
							|  |  |  | 			RefID:         dn.refID, | 
					
						
							|  |  |  | 			DatasourceUID: dn.datasource.UID, | 
					
						
							|  |  |  | 			Err:           err, | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-06-15 21:20:08 +08:00
										 |  |  | 	var result mathexp.Results | 
					
						
							| 
									
										
										
										
											2023-07-13 02:59:02 +08:00
										 |  |  | 	responseType, result, err = convertDataFramesToResults(ctx, dataFrames, dn.datasource.Type, s, logger) | 
					
						
							| 
									
										
										
										
											2023-06-15 21:20:08 +08:00
										 |  |  | 	return result, err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-07-13 02:59:02 +08:00
										 |  |  | func getResponseFrame(resp *backend.QueryDataResponse, refID string) (data.Frames, error) { | 
					
						
							| 
									
										
										
										
											2023-06-15 21:20:08 +08:00
										 |  |  | 	response, ok := resp.Responses[refID] | 
					
						
							| 
									
										
										
										
											2023-01-19 02:06:10 +08:00
										 |  |  | 	if !ok { | 
					
						
							| 
									
										
										
										
											2023-07-13 02:59:02 +08:00
										 |  |  | 		// This indicates that the RefID of the request was not included to the response, i.e. some problem in the data source plugin
 | 
					
						
							|  |  |  | 		keys := make([]string, 0, len(resp.Responses)) | 
					
						
							|  |  |  | 		for refID := range resp.Responses { | 
					
						
							|  |  |  | 			keys = append(keys, refID) | 
					
						
							| 
									
										
										
										
											2022-05-23 22:08:14 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2023-07-13 02:59:02 +08:00
										 |  |  | 		logger.Warn("Can't find response by refID. Return nodata", "responseRefIds", keys) | 
					
						
							|  |  |  | 		return nil, nil | 
					
						
							| 
									
										
										
										
											2023-01-19 02:06:10 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-05-23 22:08:14 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-01-19 02:06:10 +08:00
										 |  |  | 	if response.Error != nil { | 
					
						
							| 
									
										
										
										
											2023-07-13 02:59:02 +08:00
										 |  |  | 		return nil, response.Error | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return response.Frames, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func convertDataFramesToResults(ctx context.Context, frames data.Frames, datasourceType string, s *Service, logger log.Logger) (string, mathexp.Results, error) { | 
					
						
							|  |  |  | 	if len(frames) == 0 { | 
					
						
							|  |  |  | 		return "no-data", mathexp.Results{Values: mathexp.Values{mathexp.NewNoData()}}, nil | 
					
						
							| 
									
										
										
										
											2023-01-19 02:06:10 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-07-14 21:18:12 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-07-13 02:59:02 +08:00
										 |  |  | 	vals := make([]mathexp.Value, 0) | 
					
						
							| 
									
										
										
										
											2023-04-18 07:12:44 +08:00
										 |  |  | 	var dt data.FrameType | 
					
						
							| 
									
										
										
										
											2023-07-13 02:59:02 +08:00
										 |  |  | 	dt, useDataplane, _ := shouldUseDataplane(frames, logger, s.features.IsEnabled(featuremgmt.FlagDisableSSEDataplane)) | 
					
						
							| 
									
										
										
										
											2023-04-18 07:12:44 +08:00
										 |  |  | 	if useDataplane { | 
					
						
							| 
									
										
										
										
											2023-04-13 00:24:34 +08:00
										 |  |  | 		logger.Debug("Handling SSE data source query through dataplane", "datatype", dt) | 
					
						
							| 
									
										
										
										
											2023-07-13 02:59:02 +08:00
										 |  |  | 		result, err := handleDataplaneFrames(ctx, s.tracer, dt, frames) | 
					
						
							| 
									
										
										
										
											2023-06-15 21:20:08 +08:00
										 |  |  | 		return fmt.Sprintf("dataplane-%s", dt), result, err | 
					
						
							| 
									
										
										
										
											2023-04-13 00:24:34 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-07-13 02:59:02 +08:00
										 |  |  | 	if isAllFrameVectors(datasourceType, frames) { // Prometheus Specific Handling
 | 
					
						
							|  |  |  | 		vals, err := framesToNumbers(frames) | 
					
						
							| 
									
										
										
										
											2023-01-19 02:06:10 +08:00
										 |  |  | 		if err != nil { | 
					
						
							| 
									
										
										
										
											2023-06-15 21:20:08 +08:00
										 |  |  | 			return "", mathexp.Results{}, fmt.Errorf("failed to read frames as numbers: %w", err) | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2023-06-15 21:20:08 +08:00
										 |  |  | 		return "vector", mathexp.Results{Values: vals}, nil | 
					
						
							| 
									
										
										
										
											2023-01-19 02:06:10 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-07-13 02:59:02 +08:00
										 |  |  | 	if len(frames) == 1 { | 
					
						
							|  |  |  | 		frame := frames[0] | 
					
						
							| 
									
										
										
										
											2023-01-19 02:06:10 +08:00
										 |  |  | 		// Handle Untyped NoData
 | 
					
						
							|  |  |  | 		if len(frame.Fields) == 0 { | 
					
						
							| 
									
										
										
										
											2023-06-15 21:20:08 +08:00
										 |  |  | 			return "no-data", mathexp.Results{Values: mathexp.Values{mathexp.NoData{Frame: frame}}}, nil | 
					
						
							| 
									
										
										
										
											2023-01-19 02:06:10 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// Handle Numeric Table
 | 
					
						
							|  |  |  | 		if frame.TimeSeriesSchema().Type == data.TimeSeriesTypeNot && isNumberTable(frame) { | 
					
						
							|  |  |  | 			numberSet, err := extractNumberSet(frame) | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 			if err != nil { | 
					
						
							| 
									
										
										
										
											2023-06-15 21:20:08 +08:00
										 |  |  | 				return "", mathexp.Results{}, err | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 			} | 
					
						
							| 
									
										
										
										
											2023-01-19 02:06:10 +08:00
										 |  |  | 			for _, n := range numberSet { | 
					
						
							|  |  |  | 				vals = append(vals, n) | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 			} | 
					
						
							| 
									
										
										
										
											2023-06-15 21:20:08 +08:00
										 |  |  | 			return "number set", mathexp.Results{ | 
					
						
							| 
									
										
										
										
											2023-01-19 02:06:10 +08:00
										 |  |  | 				Values: vals, | 
					
						
							|  |  |  | 			}, nil | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-07-13 02:59:02 +08:00
										 |  |  | 	for _, frame := range frames { | 
					
						
							| 
									
										
										
										
											2023-01-19 02:06:10 +08:00
										 |  |  | 		// Check for TimeSeriesTypeNot in InfluxDB queries. A data frame of this type will cause
 | 
					
						
							|  |  |  | 		// the WideToMany() function to error out, which results in unhealthy alerts.
 | 
					
						
							|  |  |  | 		// This check should be removed once inconsistencies in data source responses are solved.
 | 
					
						
							| 
									
										
										
										
											2023-06-15 21:20:08 +08:00
										 |  |  | 		if frame.TimeSeriesSchema().Type == data.TimeSeriesTypeNot && datasourceType == datasources.DS_INFLUXDB { | 
					
						
							| 
									
										
										
										
											2023-01-19 02:06:10 +08:00
										 |  |  | 			logger.Warn("Ignoring InfluxDB data frame due to missing numeric fields") | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2023-06-15 21:20:08 +08:00
										 |  |  | 		var series []mathexp.Series | 
					
						
							| 
									
										
										
										
											2023-01-19 02:06:10 +08:00
										 |  |  | 		series, err := WideToMany(frame) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							| 
									
										
										
										
											2023-06-15 21:20:08 +08:00
										 |  |  | 			return "", mathexp.Results{}, err | 
					
						
							| 
									
										
										
										
											2023-01-19 02:06:10 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2023-06-15 21:20:08 +08:00
										 |  |  | 		for _, ser := range series { | 
					
						
							|  |  |  | 			vals = append(vals, ser) | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2023-01-19 02:06:10 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-06-15 21:20:08 +08:00
										 |  |  | 	return "series set", mathexp.Results{ | 
					
						
							| 
									
										
										
										
											2023-01-19 02:06:10 +08:00
										 |  |  | 		Values: vals, // TODO vals can be empty. Should we replace with no-data?
 | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 	}, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-05-23 22:08:14 +08:00
										 |  |  | func isAllFrameVectors(datasourceType string, frames data.Frames) bool { | 
					
						
							|  |  |  | 	if datasourceType != "prometheus" { | 
					
						
							|  |  |  | 		return false | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	allVector := false | 
					
						
							|  |  |  | 	for i, frame := range frames { | 
					
						
							|  |  |  | 		if frame.Meta != nil && frame.Meta.Custom != nil { | 
					
						
							|  |  |  | 			if sMap, ok := frame.Meta.Custom.(map[string]string); ok { | 
					
						
							|  |  |  | 				if sMap != nil { | 
					
						
							|  |  |  | 					if sMap["resultType"] == "vector" { | 
					
						
							|  |  |  | 						if i != 0 && !allVector { | 
					
						
							|  |  |  | 							break | 
					
						
							|  |  |  | 						} | 
					
						
							|  |  |  | 						allVector = true | 
					
						
							|  |  |  | 					} | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return allVector | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func framesToNumbers(frames data.Frames) ([]mathexp.Value, error) { | 
					
						
							|  |  |  | 	vals := make([]mathexp.Value, 0, len(frames)) | 
					
						
							|  |  |  | 	for _, frame := range frames { | 
					
						
							|  |  |  | 		if frame == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		if len(frame.Fields) == 2 && frame.Fields[0].Len() == 1 { | 
					
						
							|  |  |  | 			// Can there be zero Len Field results that are being skipped?
 | 
					
						
							|  |  |  | 			valueField := frame.Fields[1] | 
					
						
							|  |  |  | 			if valueField.Type().Numeric() { // should be []float64
 | 
					
						
							|  |  |  | 				val, err := valueField.FloatAt(0) // FloatAt should not err if numeric
 | 
					
						
							|  |  |  | 				if err != nil { | 
					
						
							|  |  |  | 					return nil, fmt.Errorf("failed to read value of frame [%v] (RefID %v) of type [%v] as float: %w", frame.Name, frame.RefID, valueField.Type(), err) | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 				n := mathexp.NewNumber(frame.Name, valueField.Labels) | 
					
						
							|  |  |  | 				n.SetValue(&val) | 
					
						
							|  |  |  | 				vals = append(vals, n) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return vals, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | func isNumberTable(frame *data.Frame) bool { | 
					
						
							|  |  |  | 	if frame == nil || frame.Fields == nil { | 
					
						
							|  |  |  | 		return false | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	numericCount := 0 | 
					
						
							|  |  |  | 	stringCount := 0 | 
					
						
							|  |  |  | 	otherCount := 0 | 
					
						
							|  |  |  | 	for _, field := range frame.Fields { | 
					
						
							|  |  |  | 		fType := field.Type() | 
					
						
							|  |  |  | 		switch { | 
					
						
							|  |  |  | 		case fType.Numeric(): | 
					
						
							|  |  |  | 			numericCount++ | 
					
						
							|  |  |  | 		case fType == data.FieldTypeString || fType == data.FieldTypeNullableString: | 
					
						
							|  |  |  | 			stringCount++ | 
					
						
							|  |  |  | 		default: | 
					
						
							|  |  |  | 			otherCount++ | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return numericCount == 1 && otherCount == 0 | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func extractNumberSet(frame *data.Frame) ([]mathexp.Number, error) { | 
					
						
							|  |  |  | 	numericField := 0 | 
					
						
							|  |  |  | 	stringFieldIdxs := []int{} | 
					
						
							|  |  |  | 	stringFieldNames := []string{} | 
					
						
							|  |  |  | 	for i, field := range frame.Fields { | 
					
						
							|  |  |  | 		fType := field.Type() | 
					
						
							|  |  |  | 		switch { | 
					
						
							|  |  |  | 		case fType.Numeric(): | 
					
						
							|  |  |  | 			numericField = i | 
					
						
							|  |  |  | 		case fType == data.FieldTypeString || fType == data.FieldTypeNullableString: | 
					
						
							|  |  |  | 			stringFieldIdxs = append(stringFieldIdxs, i) | 
					
						
							|  |  |  | 			stringFieldNames = append(stringFieldNames, field.Name) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	numbers := make([]mathexp.Number, frame.Rows()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for rowIdx := 0; rowIdx < frame.Rows(); rowIdx++ { | 
					
						
							|  |  |  | 		val, _ := frame.FloatAt(numericField, rowIdx) | 
					
						
							|  |  |  | 		var labels data.Labels | 
					
						
							|  |  |  | 		for i := 0; i < len(stringFieldIdxs); i++ { | 
					
						
							|  |  |  | 			if i == 0 { | 
					
						
							|  |  |  | 				labels = make(data.Labels) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			key := stringFieldNames[i] // TODO check for duplicate string column names
 | 
					
						
							|  |  |  | 			val, _ := frame.ConcreteAt(stringFieldIdxs[i], rowIdx) | 
					
						
							|  |  |  | 			labels[key] = val.(string) // TODO check assertion / return error
 | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-11-16 23:39:28 +08:00
										 |  |  | 		n := mathexp.NewNumber(frame.Fields[numericField].Name, labels) | 
					
						
							| 
									
										
										
										
											2022-03-10 23:03:26 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 		// The new value fields' configs gets pointed to the one in the original frame
 | 
					
						
							|  |  |  | 		n.Frame.Fields[0].Config = frame.Fields[numericField].Config | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 		n.SetValue(&val) | 
					
						
							| 
									
										
										
										
											2022-03-10 23:03:26 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 		numbers[rowIdx] = n | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return numbers, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // WideToMany converts a data package wide type Frame to one or multiple Series. A series
 | 
					
						
							|  |  |  | // is created for each value type column of wide frame.
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // This might not be a good idea long term, but works now as an adapter/shim.
 | 
					
						
							|  |  |  | func WideToMany(frame *data.Frame) ([]mathexp.Series, error) { | 
					
						
							|  |  |  | 	tsSchema := frame.TimeSeriesSchema() | 
					
						
							|  |  |  | 	if tsSchema.Type != data.TimeSeriesTypeWide { | 
					
						
							|  |  |  | 		return nil, fmt.Errorf("input data must be a wide series but got type %s (input refid)", tsSchema.Type) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if len(tsSchema.ValueIndices) == 1 { | 
					
						
							|  |  |  | 		s, err := mathexp.SeriesFromFrame(frame) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return nil, err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		return []mathexp.Series{s}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	series := []mathexp.Series{} | 
					
						
							|  |  |  | 	for _, valIdx := range tsSchema.ValueIndices { | 
					
						
							|  |  |  | 		l := frame.Rows() | 
					
						
							|  |  |  | 		f := data.NewFrameOfFieldTypes(frame.Name, l, frame.Fields[tsSchema.TimeIndex].Type(), frame.Fields[valIdx].Type()) | 
					
						
							|  |  |  | 		f.Fields[0].Name = frame.Fields[tsSchema.TimeIndex].Name | 
					
						
							|  |  |  | 		f.Fields[1].Name = frame.Fields[valIdx].Name | 
					
						
							| 
									
										
										
										
											2022-03-10 23:03:26 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 		// The new value fields' configs gets pointed to the one in the original frame
 | 
					
						
							|  |  |  | 		f.Fields[1].Config = frame.Fields[valIdx].Config | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 		if frame.Fields[valIdx].Labels != nil { | 
					
						
							|  |  |  | 			f.Fields[1].Labels = frame.Fields[valIdx].Labels.Copy() | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		for i := 0; i < l; i++ { | 
					
						
							|  |  |  | 			f.SetRow(i, frame.Fields[tsSchema.TimeIndex].CopyAt(i), frame.Fields[valIdx].CopyAt(i)) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		s, err := mathexp.SeriesFromFrame(f) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return nil, err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		series = append(series, s) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return series, nil | 
					
						
							|  |  |  | } |