mirror of https://github.com/grafana/grafana.git
				
				
				
			
		
			
				
	
	
		
			157 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			157 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Go
		
	
	
	
package expr
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"net/http"
 | 
						|
	"strings"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/grafana/grafana-plugin-sdk-go/backend"
 | 
						|
	jsoniter "github.com/json-iterator/go"
 | 
						|
	"gonum.org/v1/gonum/graph/simple"
 | 
						|
 | 
						|
	"github.com/grafana/grafana/pkg/api/response"
 | 
						|
	"github.com/grafana/grafana/pkg/expr/mathexp"
 | 
						|
	"github.com/grafana/grafana/pkg/expr/ml"
 | 
						|
	"github.com/grafana/grafana/pkg/plugins"
 | 
						|
	"github.com/grafana/grafana/pkg/plugins/httpresponsesender"
 | 
						|
)
 | 
						|
 | 
						|
var (
 | 
						|
	errMLPluginDoesNotExist = fmt.Errorf("expression type Machine Learning cannot be executed. Plugin '%s' must be installed and initialized", mlPluginID)
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	// mlDatasourceID is similar to a fake ID for CMDNode. There is no specific reason for the selection of this value.
 | 
						|
	mlDatasourceID = -200
 | 
						|
 | 
						|
	// DatasourceUID is the string constant used as the datasource name in requests
 | 
						|
	// to identify it as an expression command when use in Datasource.UID.
 | 
						|
	MLDatasourceUID = "__ml__"
 | 
						|
 | 
						|
	// mlPluginID is a known constant and used in other places of the code
 | 
						|
	mlPluginID = "grafana-ml-app"
 | 
						|
)
 | 
						|
 | 
						|
// MLNode is a node of expression tree that evaluates the expression by sending the payload to Machine Learning back-end.
 | 
						|
// See ml.UnmarshalCommand for supported commands.
 | 
						|
type MLNode struct {
 | 
						|
	baseNode
 | 
						|
	command   ml.Command
 | 
						|
	TimeRange TimeRange
 | 
						|
	request   *Request
 | 
						|
}
 | 
						|
 | 
						|
// NodeType returns the data pipeline node type.
 | 
						|
func (m *MLNode) NodeType() NodeType {
 | 
						|
	return TypeMLNode
 | 
						|
}
 | 
						|
 | 
						|
// NodeType returns the data pipeline node type.
 | 
						|
func (m *MLNode) NeedsVars() []string {
 | 
						|
	return []string{}
 | 
						|
}
 | 
						|
 | 
						|
// Execute initializes plugin API client,  executes a ml.Command and then converts the result of the execution.
 | 
						|
// Returns non-empty mathexp.Results if evaluation was successful. Returns QueryError if command execution failed
 | 
						|
func (m *MLNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s *Service) (r mathexp.Results, e error) {
 | 
						|
	logger := logger.FromContext(ctx).New("datasourceType", mlPluginID, "queryRefId", m.refID)
 | 
						|
	var result mathexp.Results
 | 
						|
	timeRange := m.TimeRange.AbsoluteTime(now)
 | 
						|
 | 
						|
	// get the plugin configuration that will be used by client (auth, host, etc)
 | 
						|
	pCtx, err := s.pCtxProvider.Get(ctx, mlPluginID, m.request.User, m.request.OrgId)
 | 
						|
	if err != nil {
 | 
						|
		if errors.Is(err, plugins.ErrPluginNotRegistered) {
 | 
						|
			return result, errMLPluginDoesNotExist
 | 
						|
		}
 | 
						|
		return result, fmt.Errorf("failed to get plugin settings: %w", err)
 | 
						|
	}
 | 
						|
 | 
						|
	// Plugin must be initialized by the admin first. That will create service account, and update plugin settings so all requests can use it.
 | 
						|
	// Fail if it is not initialized.
 | 
						|
	if pCtx.AppInstanceSettings == nil || !jsoniter.Get(pCtx.AppInstanceSettings.JSONData, "initialized").ToBool() {
 | 
						|
		return mathexp.Results{}, errMLPluginDoesNotExist
 | 
						|
	}
 | 
						|
 | 
						|
	// responseType and respStatus will be updated below. Use defer to ensure that debug log message is always emitted
 | 
						|
	responseType := "unknown"
 | 
						|
	respStatus := "success"
 | 
						|
	defer func() {
 | 
						|
		if e != nil {
 | 
						|
			responseType = "error"
 | 
						|
			respStatus = "failure"
 | 
						|
		}
 | 
						|
		logger.Debug("Data source queried", "responseType", responseType)
 | 
						|
		useDataplane := strings.HasPrefix("dataplane-", responseType)
 | 
						|
		s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), mlPluginID).Inc()
 | 
						|
	}()
 | 
						|
 | 
						|
	// Execute the command and provide callback function for sending a request via plugin API.
 | 
						|
	// This lets us make commands abstracted from peculiarities of the transfer protocol.
 | 
						|
	data, err := m.command.Execute(timeRange.From, timeRange.To, func(method string, path string, payload []byte) (response.Response, error) {
 | 
						|
		crReq := &backend.CallResourceRequest{
 | 
						|
			PluginContext: pCtx,
 | 
						|
			Path:          path,
 | 
						|
			Method:        method,
 | 
						|
			URL:           path,
 | 
						|
			Headers:       make(map[string][]string, len(m.request.Headers)),
 | 
						|
			Body:          payload,
 | 
						|
		}
 | 
						|
 | 
						|
		// copy headers from the request to evaluate the expression pipeline. Usually this contains information from upstream, e.g. FromAlert
 | 
						|
		for key, val := range m.request.Headers {
 | 
						|
			crReq.SetHTTPHeader(key, val)
 | 
						|
		}
 | 
						|
 | 
						|
		resp := response.CreateNormalResponse(make(http.Header), nil, 0)
 | 
						|
		httpSender := httpresponsesender.New(resp)
 | 
						|
		err = s.pluginsClient.CallResource(ctx, crReq, httpSender)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		return resp, nil
 | 
						|
	})
 | 
						|
 | 
						|
	if err != nil {
 | 
						|
		return result, MakeQueryError(m.refID, "ml", err)
 | 
						|
	}
 | 
						|
 | 
						|
	// data is not guaranteed to be specified. In this case simulate NoData scenario
 | 
						|
	if data == nil {
 | 
						|
		data = &backend.QueryDataResponse{Responses: map[string]backend.DataResponse{}}
 | 
						|
	}
 | 
						|
 | 
						|
	dataFrames, err := getResponseFrame(logger, data, m.refID)
 | 
						|
	if err != nil {
 | 
						|
		return mathexp.Results{}, MakeQueryError(m.refID, "ml", err)
 | 
						|
	}
 | 
						|
 | 
						|
	// process the response the same way DSNode does. Use plugin ID as data source type. Semantically, they are the same.
 | 
						|
	responseType, result, err = s.converter.Convert(ctx, mlPluginID, dataFrames, s.allowLongFrames)
 | 
						|
	return result, err
 | 
						|
}
 | 
						|
 | 
						|
func (s *Service) buildMLNode(dp *simple.DirectedGraph, rn *rawNode, req *Request) (Node, error) {
 | 
						|
	if rn.TimeRange == nil {
 | 
						|
		return nil, errors.New("time range must be specified")
 | 
						|
	}
 | 
						|
 | 
						|
	cmd, err := ml.UnmarshalCommand(rn.QueryRaw, s.cfg.AppURL)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return &MLNode{
 | 
						|
		baseNode: baseNode{
 | 
						|
			id:    rn.idx,
 | 
						|
			refID: rn.RefID,
 | 
						|
		},
 | 
						|
		TimeRange: rn.TimeRange,
 | 
						|
		command:   cmd,
 | 
						|
		request:   req,
 | 
						|
	}, nil
 | 
						|
}
 |