| 
									
										
										
										
											2023-07-14 01:37:50 +08:00
										 |  |  | package expr | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"context" | 
					
						
							|  |  |  | 	"errors" | 
					
						
							|  |  |  | 	"fmt" | 
					
						
							|  |  |  | 	"net/http" | 
					
						
							|  |  |  | 	"strings" | 
					
						
							|  |  |  | 	"time" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/grafana/grafana-plugin-sdk-go/backend" | 
					
						
							|  |  |  | 	jsoniter "github.com/json-iterator/go" | 
					
						
							|  |  |  | 	"gonum.org/v1/gonum/graph/simple" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/grafana/grafana/pkg/api/response" | 
					
						
							|  |  |  | 	"github.com/grafana/grafana/pkg/expr/mathexp" | 
					
						
							|  |  |  | 	"github.com/grafana/grafana/pkg/expr/ml" | 
					
						
							| 
									
										
										
										
											2023-09-25 18:10:47 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/plugins" | 
					
						
							| 
									
										
										
										
											2023-07-14 01:37:50 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/plugins/httpresponsesender" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | var ( | 
					
						
							|  |  |  | 	errMLPluginDoesNotExist = fmt.Errorf("expression type Machine Learning cannot be executed. Plugin '%s' must be installed and initialized", mlPluginID) | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | const ( | 
					
						
							|  |  |  | 	// mlDatasourceID is similar to a fake ID for CMDNode. There is no specific reason for the selection of this value.
 | 
					
						
							|  |  |  | 	mlDatasourceID = -200 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// DatasourceUID is the string constant used as the datasource name in requests
 | 
					
						
							|  |  |  | 	// to identify it as an expression command when use in Datasource.UID.
 | 
					
						
							|  |  |  | 	MLDatasourceUID = "__ml__" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// mlPluginID is a known constant and used in other places of the code
 | 
					
						
							|  |  |  | 	mlPluginID = "grafana-ml-app" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // MLNode is a node of expression tree that evaluates the expression by sending the payload to Machine Learning back-end.
 | 
					
						
							|  |  |  | // See ml.UnmarshalCommand for supported commands.
 | 
					
						
							|  |  |  | type MLNode struct { | 
					
						
							|  |  |  | 	baseNode | 
					
						
							|  |  |  | 	command   ml.Command | 
					
						
							|  |  |  | 	TimeRange TimeRange | 
					
						
							|  |  |  | 	request   *Request | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // NodeType returns the data pipeline node type.
 | 
					
						
							|  |  |  | func (m *MLNode) NodeType() NodeType { | 
					
						
							|  |  |  | 	return TypeMLNode | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-09-14 01:58:16 +08:00
										 |  |  | // NodeType returns the data pipeline node type.
 | 
					
						
							|  |  |  | func (m *MLNode) NeedsVars() []string { | 
					
						
							|  |  |  | 	return []string{} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-07-14 01:37:50 +08:00
										 |  |  | // Execute initializes plugin API client,  executes a ml.Command and then converts the result of the execution.
 | 
					
						
							|  |  |  | // Returns non-empty mathexp.Results if evaluation was successful. Returns QueryError if command execution failed
 | 
					
						
							|  |  |  | func (m *MLNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s *Service) (r mathexp.Results, e error) { | 
					
						
							|  |  |  | 	logger := logger.FromContext(ctx).New("datasourceType", mlPluginID, "queryRefId", m.refID) | 
					
						
							|  |  |  | 	var result mathexp.Results | 
					
						
							|  |  |  | 	timeRange := m.TimeRange.AbsoluteTime(now) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// get the plugin configuration that will be used by client (auth, host, etc)
 | 
					
						
							|  |  |  | 	pCtx, err := s.pCtxProvider.Get(ctx, mlPluginID, m.request.User, m.request.OrgId) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2023-09-25 18:10:47 +08:00
										 |  |  | 		if errors.Is(err, plugins.ErrPluginNotRegistered) { | 
					
						
							| 
									
										
										
										
											2023-07-14 01:37:50 +08:00
										 |  |  | 			return result, errMLPluginDoesNotExist | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		return result, fmt.Errorf("failed to get plugin settings: %w", err) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Plugin must be initialized by the admin first. That will create service account, and update plugin settings so all requests can use it.
 | 
					
						
							|  |  |  | 	// Fail if it is not initialized.
 | 
					
						
							|  |  |  | 	if pCtx.AppInstanceSettings == nil || !jsoniter.Get(pCtx.AppInstanceSettings.JSONData, "initialized").ToBool() { | 
					
						
							|  |  |  | 		return mathexp.Results{}, errMLPluginDoesNotExist | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// responseType and respStatus will be updated below. Use defer to ensure that debug log message is always emitted
 | 
					
						
							|  |  |  | 	responseType := "unknown" | 
					
						
							|  |  |  | 	respStatus := "success" | 
					
						
							|  |  |  | 	defer func() { | 
					
						
							|  |  |  | 		if e != nil { | 
					
						
							|  |  |  | 			responseType = "error" | 
					
						
							|  |  |  | 			respStatus = "failure" | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		logger.Debug("Data source queried", "responseType", responseType) | 
					
						
							|  |  |  | 		useDataplane := strings.HasPrefix("dataplane-", responseType) | 
					
						
							| 
									
										
										
										
											2025-04-11 02:51:44 +08:00
										 |  |  | 		s.metrics.DSRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), mlPluginID).Inc() | 
					
						
							| 
									
										
										
										
											2023-07-14 01:37:50 +08:00
										 |  |  | 	}() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Execute the command and provide callback function for sending a request via plugin API.
 | 
					
						
							|  |  |  | 	// This lets us make commands abstracted from peculiarities of the transfer protocol.
 | 
					
						
							|  |  |  | 	data, err := m.command.Execute(timeRange.From, timeRange.To, func(method string, path string, payload []byte) (response.Response, error) { | 
					
						
							|  |  |  | 		crReq := &backend.CallResourceRequest{ | 
					
						
							|  |  |  | 			PluginContext: pCtx, | 
					
						
							|  |  |  | 			Path:          path, | 
					
						
							|  |  |  | 			Method:        method, | 
					
						
							|  |  |  | 			URL:           path, | 
					
						
							|  |  |  | 			Headers:       make(map[string][]string, len(m.request.Headers)), | 
					
						
							|  |  |  | 			Body:          payload, | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// copy headers from the request to evaluate the expression pipeline. Usually this contains information from upstream, e.g. FromAlert
 | 
					
						
							|  |  |  | 		for key, val := range m.request.Headers { | 
					
						
							|  |  |  | 			crReq.SetHTTPHeader(key, val) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		resp := response.CreateNormalResponse(make(http.Header), nil, 0) | 
					
						
							|  |  |  | 		httpSender := httpresponsesender.New(resp) | 
					
						
							|  |  |  | 		err = s.pluginsClient.CallResource(ctx, crReq, httpSender) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return nil, err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		return resp, nil | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2023-07-21 18:38:29 +08:00
										 |  |  | 		return result, MakeQueryError(m.refID, "ml", err) | 
					
						
							| 
									
										
										
										
											2023-07-14 01:37:50 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// data is not guaranteed to be specified. In this case simulate NoData scenario
 | 
					
						
							|  |  |  | 	if data == nil { | 
					
						
							|  |  |  | 		data = &backend.QueryDataResponse{Responses: map[string]backend.DataResponse{}} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-13 22:58:39 +08:00
										 |  |  | 	dataFrames, err := getResponseFrame(logger, data, m.refID) | 
					
						
							| 
									
										
										
										
											2023-07-14 01:37:50 +08:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2023-07-21 18:38:29 +08:00
										 |  |  | 		return mathexp.Results{}, MakeQueryError(m.refID, "ml", err) | 
					
						
							| 
									
										
										
										
											2023-07-14 01:37:50 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// process the response the same way DSNode does. Use plugin ID as data source type. Semantically, they are the same.
 | 
					
						
							| 
									
										
										
										
											2025-04-03 21:36:02 +08:00
										 |  |  | 	responseType, result, err = s.converter.Convert(ctx, mlPluginID, dataFrames, false) | 
					
						
							| 
									
										
										
										
											2023-07-14 01:37:50 +08:00
										 |  |  | 	return result, err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *Service) buildMLNode(dp *simple.DirectedGraph, rn *rawNode, req *Request) (Node, error) { | 
					
						
							|  |  |  | 	if rn.TimeRange == nil { | 
					
						
							|  |  |  | 		return nil, errors.New("time range must be specified") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	cmd, err := ml.UnmarshalCommand(rn.QueryRaw, s.cfg.AppURL) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return &MLNode{ | 
					
						
							|  |  |  | 		baseNode: baseNode{ | 
					
						
							| 
									
										
										
										
											2023-08-18 19:49:59 +08:00
										 |  |  | 			id:    rn.idx, | 
					
						
							| 
									
										
										
										
											2023-07-14 01:37:50 +08:00
										 |  |  | 			refID: rn.RefID, | 
					
						
							|  |  |  | 		}, | 
					
						
							|  |  |  | 		TimeRange: rn.TimeRange, | 
					
						
							|  |  |  | 		command:   cmd, | 
					
						
							|  |  |  | 		request:   req, | 
					
						
							|  |  |  | 	}, nil | 
					
						
							|  |  |  | } |