| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | package expr | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"context" | 
					
						
							|  |  |  | 	"encoding/json" | 
					
						
							|  |  |  | 	"fmt" | 
					
						
							| 
									
										
										
										
											2022-10-27 04:13:58 +08:00
										 |  |  | 	"time" | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/grafana/grafana/pkg/expr/mathexp" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	"gonum.org/v1/gonum/graph/simple" | 
					
						
							|  |  |  | 	"gonum.org/v1/gonum/graph/topo" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // NodeType is the type of a DPNode. Currently either a expression command or datasource query.
 | 
					
						
							|  |  |  | type NodeType int | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | const ( | 
					
						
							|  |  |  | 	// TypeCMDNode is a NodeType for expression commands.
 | 
					
						
							|  |  |  | 	TypeCMDNode NodeType = iota | 
					
						
							|  |  |  | 	// TypeDatasourceNode is a NodeType for datasource queries.
 | 
					
						
							|  |  |  | 	TypeDatasourceNode | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-04-27 19:22:11 +08:00
										 |  |  | func (nt NodeType) String() string { | 
					
						
							|  |  |  | 	switch nt { | 
					
						
							|  |  |  | 	case TypeCMDNode: | 
					
						
							|  |  |  | 		return "Expression" | 
					
						
							|  |  |  | 	case TypeDatasourceNode: | 
					
						
							|  |  |  | 		return "Datasource" | 
					
						
							|  |  |  | 	default: | 
					
						
							|  |  |  | 		return "Unknown" | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | // Node is a node in a Data Pipeline. Node is either a expression command or a datasource query.
 | 
					
						
							|  |  |  | type Node interface { | 
					
						
							|  |  |  | 	ID() int64 // ID() allows the gonum graph node interface to be fulfilled
 | 
					
						
							|  |  |  | 	NodeType() NodeType | 
					
						
							|  |  |  | 	RefID() string | 
					
						
							| 
									
										
										
										
											2022-10-27 04:13:58 +08:00
										 |  |  | 	Execute(ctx context.Context, now time.Time, vars mathexp.Vars, s *Service) (mathexp.Results, error) | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 	String() string | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // DataPipeline is an ordered set of nodes returned from DPGraph processing.
 | 
					
						
							|  |  |  | type DataPipeline []Node | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // execute runs all the command/datasource requests in the pipeline return a
 | 
					
						
							|  |  |  | // map of the refId of the of each command
 | 
					
						
							| 
									
										
										
										
											2022-10-27 04:13:58 +08:00
										 |  |  | func (dp *DataPipeline) execute(c context.Context, now time.Time, s *Service) (mathexp.Vars, error) { | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 	vars := make(mathexp.Vars) | 
					
						
							|  |  |  | 	for _, node := range *dp { | 
					
						
							| 
									
										
										
										
											2022-10-27 04:13:58 +08:00
										 |  |  | 		res, err := node.Execute(c, now, vars, s) | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return nil, err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		vars[node.RefID()] = res | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return vars, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // BuildPipeline builds a graph of the nodes, and returns the nodes in an
 | 
					
						
							|  |  |  | // executable order.
 | 
					
						
							| 
									
										
										
										
											2021-04-23 22:52:32 +08:00
										 |  |  | func (s *Service) buildPipeline(req *Request) (DataPipeline, error) { | 
					
						
							| 
									
										
										
										
											2021-03-08 14:02:49 +08:00
										 |  |  | 	graph, err := s.buildDependencyGraph(req) | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	nodes, err := buildExecutionOrder(graph) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return nodes, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // buildDependencyGraph returns a dependency graph for a set of queries.
 | 
					
						
							| 
									
										
										
										
											2021-04-23 22:52:32 +08:00
										 |  |  | func (s *Service) buildDependencyGraph(req *Request) (*simple.DirectedGraph, error) { | 
					
						
							| 
									
										
										
										
											2021-03-08 14:02:49 +08:00
										 |  |  | 	graph, err := s.buildGraph(req) | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	registry := buildNodeRegistry(graph) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if err := buildGraphEdges(graph, registry); err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return graph, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // buildExecutionOrder returns a sequence of nodes ordered by dependency.
 | 
					
						
							|  |  |  | func buildExecutionOrder(graph *simple.DirectedGraph) ([]Node, error) { | 
					
						
							|  |  |  | 	sortedNodes, err := topo.Sort(graph) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	nodes := make([]Node, len(sortedNodes)) | 
					
						
							|  |  |  | 	for i, v := range sortedNodes { | 
					
						
							|  |  |  | 		nodes[i] = v.(Node) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return nodes, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // buildNodeRegistry returns a lookup table for reference IDs to respective node.
 | 
					
						
							|  |  |  | func buildNodeRegistry(g *simple.DirectedGraph) map[string]Node { | 
					
						
							|  |  |  | 	res := make(map[string]Node) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	nodeIt := g.Nodes() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for nodeIt.Next() { | 
					
						
							|  |  |  | 		if dpNode, ok := nodeIt.Node().(Node); ok { | 
					
						
							|  |  |  | 			res[dpNode.RefID()] = dpNode | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return res | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // buildGraph creates a new graph populated with nodes for every query.
 | 
					
						
							| 
									
										
										
										
											2021-04-23 22:52:32 +08:00
										 |  |  | func (s *Service) buildGraph(req *Request) (*simple.DirectedGraph, error) { | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 	dp := simple.NewDirectedGraph() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-12-07 23:30:38 +08:00
										 |  |  | 	for _, query := range req.Queries { | 
					
						
							| 
									
										
										
										
											2021-12-17 00:51:46 +08:00
										 |  |  | 		if query.DataSource == nil || query.DataSource.Uid == "" { | 
					
						
							|  |  |  | 			return nil, fmt.Errorf("missing datasource uid in query with refId %v", query.RefID) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 		rawQueryProp := make(map[string]interface{}) | 
					
						
							| 
									
										
										
										
											2021-04-23 22:52:32 +08:00
										 |  |  | 		queryBytes, err := query.JSON.MarshalJSON() | 
					
						
							| 
									
										
										
										
											2021-10-30 01:57:24 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return nil, err | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2021-10-30 01:57:24 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-04-23 22:52:32 +08:00
										 |  |  | 		err = json.Unmarshal(queryBytes, &rawQueryProp) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return nil, err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 		rn := &rawNode{ | 
					
						
							| 
									
										
										
										
											2021-12-17 00:51:46 +08:00
										 |  |  | 			Query:      rawQueryProp, | 
					
						
							|  |  |  | 			RefID:      query.RefID, | 
					
						
							|  |  |  | 			TimeRange:  query.TimeRange, | 
					
						
							|  |  |  | 			QueryType:  query.QueryType, | 
					
						
							|  |  |  | 			DataSource: query.DataSource, | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-10-30 01:57:24 +08:00
										 |  |  | 		var node Node | 
					
						
							| 
									
										
										
										
											2021-04-16 21:29:19 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-12-17 00:51:46 +08:00
										 |  |  | 		if IsDataSource(rn.DataSource.Uid) { | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 			node, err = buildCMDNode(dp, rn) | 
					
						
							| 
									
										
										
										
											2021-10-30 01:57:24 +08:00
										 |  |  | 		} else { | 
					
						
							| 
									
										
										
										
											2021-07-09 19:43:22 +08:00
										 |  |  | 			node, err = s.buildDSNode(dp, rn, req) | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2021-10-30 01:57:24 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return nil, err | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2021-10-30 01:57:24 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 		dp.AddNode(node) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return dp, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // buildGraphEdges generates graph edges based on each node's dependencies.
 | 
					
						
							|  |  |  | func buildGraphEdges(dp *simple.DirectedGraph, registry map[string]Node) error { | 
					
						
							|  |  |  | 	nodeIt := dp.Nodes() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for nodeIt.Next() { | 
					
						
							|  |  |  | 		node := nodeIt.Node().(Node) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		if node.NodeType() != TypeCMDNode { | 
					
						
							|  |  |  | 			// datasource node, nothing to do for now. Although if we want expression results to be
 | 
					
						
							|  |  |  | 			// used as datasource query params some day this will need change
 | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		cmdNode := node.(*CMDNode) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		for _, neededVar := range cmdNode.Command.NeedsVars() { | 
					
						
							|  |  |  | 			neededNode, ok := registry[neededVar] | 
					
						
							|  |  |  | 			if !ok { | 
					
						
							|  |  |  | 				return fmt.Errorf("unable to find dependent node '%v'", neededVar) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			if neededNode.ID() == cmdNode.ID() { | 
					
						
							| 
									
										
										
										
											2022-09-22 03:14:11 +08:00
										 |  |  | 				return fmt.Errorf("expression '%v' cannot reference itself. Must be query or another expression", neededVar) | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-04-27 19:22:11 +08:00
										 |  |  | 			if cmdNode.CMDType == TypeClassicConditions { | 
					
						
							|  |  |  | 				if neededNode.NodeType() != TypeDatasourceNode { | 
					
						
							|  |  |  | 					return fmt.Errorf("only data source queries may be inputs to a classic condition, %v is a %v", neededVar, neededNode.NodeType()) | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			if neededNode.NodeType() == TypeCMDNode { | 
					
						
							|  |  |  | 				if neededNode.(*CMDNode).CMDType == TypeClassicConditions { | 
					
						
							|  |  |  | 					return fmt.Errorf("classic conditions may not be the input for other expressions, but %v is the input for %v", neededVar, cmdNode.RefID()) | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-11-19 20:17:00 +08:00
										 |  |  | 			edge := dp.NewEdge(neededNode, cmdNode) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			dp.SetEdge(edge) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } |