mirror of https://github.com/grafana/grafana.git
				
				
				
			
		
			
				
	
	
		
			220 lines
		
	
	
		
			5.7 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			220 lines
		
	
	
		
			5.7 KiB
		
	
	
	
		
			Go
		
	
	
	
| package expr
 | |
| 
 | |
| import (
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"strconv"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/grafana/grafana-plugin-sdk-go/backend"
 | |
| 	"github.com/grafana/grafana/pkg/bus"
 | |
| 	"github.com/grafana/grafana/pkg/components/simplejson"
 | |
| 	"github.com/grafana/grafana/pkg/models"
 | |
| 	"github.com/grafana/grafana/pkg/plugins"
 | |
| 	"github.com/prometheus/client_golang/prometheus"
 | |
| 	"golang.org/x/net/context"
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	expressionsQuerySummary *prometheus.SummaryVec
 | |
| )
 | |
| 
 | |
| func init() {
 | |
| 	expressionsQuerySummary = prometheus.NewSummaryVec(
 | |
| 		prometheus.SummaryOpts{
 | |
| 			Name:       "expressions_queries_duration_milliseconds",
 | |
| 			Help:       "Expressions query summary",
 | |
| 			Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
 | |
| 		},
 | |
| 		[]string{"status"},
 | |
| 	)
 | |
| 
 | |
| 	prometheus.MustRegister(expressionsQuerySummary)
 | |
| }
 | |
| 
 | |
| // WrapTransformData creates and executes transform requests
 | |
| func (s *Service) WrapTransformData(ctx context.Context, query plugins.DataQuery) (*backend.QueryDataResponse, error) {
 | |
| 	req := Request{
 | |
| 		OrgId:   query.User.OrgId,
 | |
| 		Queries: []Query{},
 | |
| 	}
 | |
| 
 | |
| 	for _, q := range query.Queries {
 | |
| 		modelJSON, err := q.Model.MarshalJSON()
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		req.Queries = append(req.Queries, Query{
 | |
| 			JSON:          modelJSON,
 | |
| 			Interval:      time.Duration(q.IntervalMS) * time.Millisecond,
 | |
| 			RefID:         q.RefID,
 | |
| 			MaxDataPoints: q.MaxDataPoints,
 | |
| 			QueryType:     q.QueryType,
 | |
| 			TimeRange: TimeRange{
 | |
| 				From: query.TimeRange.GetFromAsTimeUTC(),
 | |
| 				To:   query.TimeRange.GetToAsTimeUTC(),
 | |
| 			},
 | |
| 		})
 | |
| 	}
 | |
| 	return s.TransformData(ctx, &req)
 | |
| }
 | |
| 
 | |
| // Request is similar to plugins.DataQuery but with the Time Ranges is per Query.
 | |
| type Request struct {
 | |
| 	Headers map[string]string
 | |
| 	Debug   bool
 | |
| 	OrgId   int64
 | |
| 	Queries []Query
 | |
| }
 | |
| 
 | |
| // Query is like plugins.DataSubQuery, but with a a time range, and only the UID
 | |
| // for the data source. Also interval is a time.Duration.
 | |
| type Query struct {
 | |
| 	RefID         string
 | |
| 	TimeRange     TimeRange
 | |
| 	DatasourceUID string
 | |
| 	JSON          json.RawMessage
 | |
| 	Interval      time.Duration
 | |
| 	QueryType     string
 | |
| 	MaxDataPoints int64
 | |
| }
 | |
| 
 | |
| // TimeRange is a time.Time based TimeRange.
 | |
| type TimeRange struct {
 | |
| 	From time.Time
 | |
| 	To   time.Time
 | |
| }
 | |
| 
 | |
| // TransformData takes Queries which are either expressions nodes
 | |
| // or are datasource requests.
 | |
| func (s *Service) TransformData(ctx context.Context, req *Request) (r *backend.QueryDataResponse, err error) {
 | |
| 	if s.isDisabled() {
 | |
| 		return nil, fmt.Errorf("server side expressions are disabled")
 | |
| 	}
 | |
| 
 | |
| 	start := time.Now()
 | |
| 	defer func() {
 | |
| 		var respStatus string
 | |
| 		switch {
 | |
| 		case err == nil:
 | |
| 			respStatus = "success"
 | |
| 		default:
 | |
| 			respStatus = "failure"
 | |
| 		}
 | |
| 		duration := float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond)
 | |
| 		expressionsQuerySummary.WithLabelValues(respStatus).Observe(duration)
 | |
| 	}()
 | |
| 
 | |
| 	// Build the pipeline from the request, checking for ordering issues (e.g. loops)
 | |
| 	// and parsing graph nodes from the queries.
 | |
| 	pipeline, err := s.BuildPipeline(req)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// Execute the pipeline
 | |
| 	responses, err := s.ExecutePipeline(ctx, pipeline)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// Get which queries have the Hide property so they those queries' results
 | |
| 	// can be excluded from the response.
 | |
| 	hidden, err := hiddenRefIDs(req.Queries)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	if len(hidden) != 0 {
 | |
| 		filteredRes := backend.NewQueryDataResponse()
 | |
| 		for refID, res := range responses.Responses {
 | |
| 			if _, ok := hidden[refID]; !ok {
 | |
| 				filteredRes.Responses[refID] = res
 | |
| 			}
 | |
| 		}
 | |
| 		responses = filteredRes
 | |
| 	}
 | |
| 
 | |
| 	return responses, nil
 | |
| }
 | |
| 
 | |
| func hiddenRefIDs(queries []Query) (map[string]struct{}, error) {
 | |
| 	hidden := make(map[string]struct{})
 | |
| 
 | |
| 	for _, query := range queries {
 | |
| 		hide := struct {
 | |
| 			Hide bool `json:"hide"`
 | |
| 		}{}
 | |
| 
 | |
| 		if err := json.Unmarshal(query.JSON, &hide); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		if hide.Hide {
 | |
| 			hidden[query.RefID] = struct{}{}
 | |
| 		}
 | |
| 	}
 | |
| 	return hidden, nil
 | |
| }
 | |
| 
 | |
| // queryData is called used to query datasources that are not expression commands, but are used
 | |
| // alongside expressions and/or are the input of an expression command.
 | |
| func (s *Service) queryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
 | |
| 	if len(req.Queries) == 0 {
 | |
| 		return nil, fmt.Errorf("zero queries found in datasource request")
 | |
| 	}
 | |
| 
 | |
| 	datasourceID := int64(0)
 | |
| 	var datasourceUID string
 | |
| 
 | |
| 	if req.PluginContext.DataSourceInstanceSettings != nil {
 | |
| 		datasourceID = req.PluginContext.DataSourceInstanceSettings.ID
 | |
| 		datasourceUID = req.PluginContext.DataSourceInstanceSettings.UID
 | |
| 	}
 | |
| 
 | |
| 	getDsInfo := &models.GetDataSourceQuery{
 | |
| 		OrgId: req.PluginContext.OrgID,
 | |
| 		Id:    datasourceID,
 | |
| 		Uid:   datasourceUID,
 | |
| 	}
 | |
| 
 | |
| 	if err := bus.DispatchCtx(ctx, getDsInfo); err != nil {
 | |
| 		return nil, fmt.Errorf("could not find datasource: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Convert plugin-model (datasource) queries to tsdb queries
 | |
| 	queries := make([]plugins.DataSubQuery, len(req.Queries))
 | |
| 	for i, query := range req.Queries {
 | |
| 		sj, err := simplejson.NewJson(query.JSON)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		queries[i] = plugins.DataSubQuery{
 | |
| 			RefID:         query.RefID,
 | |
| 			IntervalMS:    query.Interval.Milliseconds(),
 | |
| 			MaxDataPoints: query.MaxDataPoints,
 | |
| 			QueryType:     query.QueryType,
 | |
| 			DataSource:    getDsInfo.Result,
 | |
| 			Model:         sj,
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// For now take Time Range from first query.
 | |
| 	timeRange := plugins.NewDataTimeRange(strconv.FormatInt(req.Queries[0].TimeRange.From.Unix()*1000, 10),
 | |
| 		strconv.FormatInt(req.Queries[0].TimeRange.To.Unix()*1000, 10))
 | |
| 
 | |
| 	tQ := plugins.DataQuery{
 | |
| 		TimeRange: &timeRange,
 | |
| 		Queries:   queries,
 | |
| 		Headers:   req.Headers,
 | |
| 	}
 | |
| 
 | |
| 	// Execute the converted queries
 | |
| 	tsdbRes, err := s.DataService.HandleRequest(ctx, getDsInfo.Result, tQ)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return tsdbRes.ToBackendDataResponse()
 | |
| }
 |